From 7944d9a65419dab989bcf1dc722ab9a6e45cd502 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 00:19:02 -0800 Subject: [PATCH 01/16] move constants; fix types --- cpp/include/cudf/io/parquet.hpp | 4 ++ cpp/src/io/parquet/writer_impl.cu | 62 +++++++++++++++--------------- cpp/src/io/parquet/writer_impl.hpp | 13 ++----- 3 files changed, 38 insertions(+), 41 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 660ec051304..47b9b3b79d8 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -37,6 +37,10 @@ namespace io { * @file */ +constexpr size_t default_rowgroup_size_bytes = 128 * 1024 * 1024; // 128MB +constexpr size_type default_rowgroup_size_rows = 1000000; +constexpr size_t default_page_size_bytes = 512 * 1024; // 512KB + /** * @brief Builds parquet_reader_options to use for `read_parquet()`. */ diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 2ab5d7d696b..ac9f4d4b263 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1146,8 +1146,7 @@ void writer::impl::write(table_view const& table) // compression/decompression performance). using cudf::io::parquet::gpu::max_page_fragment_size; - uint32_t num_fragments = - (uint32_t)((num_rows + max_page_fragment_size - 1) / max_page_fragment_size); + size_type const num_fragments = (num_rows + max_page_fragment_size - 1) / max_page_fragment_size; cudf::detail::hostdevice_2dvector fragments( num_columns, num_fragments, stream); @@ -1160,12 +1159,12 @@ void writer::impl::write(table_view const& table) init_page_fragments(fragments, col_desc, num_rows, max_page_fragment_size); } - size_t global_rowgroup_base = md.row_groups.size(); + auto const global_rowgroup_base = static_cast(md.row_groups.size()); // Decide row group boundaries based on uncompressed data size - size_t rowgroup_size = 0; - uint32_t num_rowgroups = 0; - for (uint32_t f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; + size_t rowgroup_size = 0; + size_type num_rowgroups = 0; + for (size_type f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; f++) { size_t fragment_data_size = 0; // Replace with STL algorithm to transform and sum @@ -1173,8 +1172,8 @@ void writer::impl::write(table_view const& table) fragment_data_size += fragments[i][f].fragment_data_size; } if (f > rowgroup_start && - (rowgroup_size + fragment_data_size > max_rowgroup_size_ || - (f + 1 - rowgroup_start) * max_page_fragment_size > max_rowgroup_rows_)) { + (rowgroup_size + fragment_data_size > max_rowgroup_size || + (f + 1 - rowgroup_start) * max_page_fragment_size > max_rowgroup_rows)) { // update schema md.row_groups.resize(md.row_groups.size() + 1); md.row_groups[global_r++].num_rows = (f - rowgroup_start) * max_page_fragment_size; @@ -1202,15 +1201,15 @@ void writer::impl::write(table_view const& table) } } // Initialize row groups and column chunks - uint32_t num_chunks = num_rowgroups * num_columns; + auto const num_chunks = num_rowgroups * num_columns; hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); - for (uint32_t r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; + for (size_type r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; r++, global_r++) { - uint32_t fragments_in_chunk = (uint32_t)( - (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size); + size_type fragments_in_chunk = + (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size; md.row_groups[global_r].total_byte_size = 0; md.row_groups[global_r].columns.resize(num_columns); - for (int i = 0; i < num_columns; i++) { + for (size_type i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; *ck = {}; @@ -1242,8 +1241,8 @@ void writer::impl::write(table_view const& table) } auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, num_rows, stream); - for (uint32_t rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { - for (int col = 0; col < num_columns; col++) { + for (size_type rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { + for (size_type col = 0; col < num_columns; col++) { if (chunks.host_view()[rg][col].use_dictionary) { md.row_groups[global_rg].columns[col].meta_data.encodings.push_back( Encoding::PLAIN_DICTIONARY); @@ -1272,16 +1271,16 @@ void writer::impl::write(table_view const& table) } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) - std::vector batch_list; - uint32_t num_pages = 0; - size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this - size_t max_uncomp_bfr_size = 0; - size_t max_comp_bfr_size = 0; - size_t max_chunk_bfr_size = 0; - uint32_t max_pages_in_batch = 0; - size_t bytes_in_batch = 0; - size_t comp_bytes_in_batch = 0; - for (uint32_t r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { + std::vector batch_list; + size_type num_pages = 0; + size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this + size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; + size_t max_chunk_bfr_size = 0; + size_type max_pages_in_batch = 0; + size_t bytes_in_batch = 0; + size_t comp_bytes_in_batch = 0; + for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { size_t rowgroup_size = 0; size_t comp_rowgroup_size = 0; if (r < num_rowgroups) { @@ -1332,7 +1331,7 @@ void writer::impl::write(table_view const& table) for (uint32_t b = 0, r = 0; b < (uint32_t)batch_list.size(); b++) { uint8_t* bfr = static_cast(uncomp_bfr.data()); uint8_t* bfr_c = static_cast(comp_bfr.data()); - for (uint32_t j = 0; j < batch_list[b]; j++, r++) { + for (size_type j = 0; j < batch_list[b]; j++, r++) { for (int i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; ck->uncompressed_bfr = bfr; @@ -1358,14 +1357,15 @@ void writer::impl::write(table_view const& table) pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches - for (uint32_t b = 0, r = 0, global_r = global_rowgroup_base; b < (uint32_t)batch_list.size(); + for (size_type b = 0, r = 0, global_r = global_rowgroup_base; + b < static_cast(batch_list.size()); b++) { // Count pages in this batch - uint32_t rnext = r + batch_list[b]; - uint32_t first_page_in_batch = chunks[r][0].first_page; - uint32_t first_page_in_next_batch = + auto const rnext = r + batch_list[b]; + auto const first_page_in_batch = chunks[r][0].first_page; + auto const first_page_in_next_batch = (rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages; - uint32_t pages_in_batch = first_page_in_next_batch - first_page_in_batch; + auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch; // device_span batch_pages{pages.data() + first_page_in_batch, } encode_pages( chunks, diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c7cdf8effd1..20a9b77625f 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -56,13 +56,6 @@ using cudf::detail::hostdevice_2dvector; * @brief Implementation for parquet writer */ class writer::impl { - // Parquet datasets are divided into fixed-size, independent rowgroups - static constexpr uint32_t DEFAULT_ROWGROUP_MAXSIZE = 128 * 1024 * 1024; // 128MB - static constexpr uint32_t DEFAULT_ROWGROUP_MAXROWS = 1000000; // Or at most 1M rows - - // rowgroups are divided into pages - static constexpr uint32_t DEFAULT_TARGET_PAGE_SIZE = 512 * 1024; - public: /** * @brief Constructor with writer options. @@ -209,9 +202,9 @@ class writer::impl { // Cuda stream to be used rmm::cuda_stream_view stream = rmm::cuda_stream_default; - size_t max_rowgroup_size_ = DEFAULT_ROWGROUP_MAXSIZE; - size_t max_rowgroup_rows_ = DEFAULT_ROWGROUP_MAXROWS; - size_t target_page_size_ = DEFAULT_TARGET_PAGE_SIZE; + size_t max_rowgroup_size = default_rowgroup_size_bytes; + size_type max_rowgroup_rows = default_rowgroup_size_rows; + size_t target_page_size = default_page_size_bytes; Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; From eb9610d81697d9b11fb134c02128a89efe86d2b4 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 00:48:58 -0800 Subject: [PATCH 02/16] API --- cpp/include/cudf/io/parquet.hpp | 174 ++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 47b9b3b79d8..2dba68c1cfa 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -402,6 +402,12 @@ class parquet_writer_options { bool _write_timestamps_as_int96 = false; // Column chunks file path to be set in the raw output metadata std::string _column_chunks_file_path; + // Maximum size of each rowgroup (unless smaller than a single page) + size_t _rowgroup_size_bytes = default_rowgroup_size_bytes; + // Maximum number of rows in rowgroup (unless smaller than a single page) + size_type _rowgroup_size_rows = default_rowgroup_size_rows; + // Target size of each page + size_t _page_size_bytes = default_page_size_bytes; /** * @brief Constructor from sink and table. @@ -476,6 +482,21 @@ class parquet_writer_options { */ std::string get_column_chunks_file_path() const { return _column_chunks_file_path; } + /** + * @brief Returns maximum rowgroup size, in bytes. + */ + auto rowgroup_size_bytes() const { return _rowgroup_size_bytes; } + + /** + * @brief Returns maximum rowgroup size, in rows. + */ + auto rowgroup_size_rows() const { return _rowgroup_size_rows; } + + /** + * @brief Returns the target page size, in bytes. + */ + auto page_size_bytes() const { return std::min(_page_size_bytes, rowgroup_size_bytes()); } + /** * @brief Sets metadata. * @@ -514,6 +535,36 @@ class parquet_writer_options { { _column_chunks_file_path.assign(file_path); } + + /** + * @brief Sets the maximum rowgroup size, in bytes. + * + * If the rowgroup size is smaller that the page size, page size will be reduced to match + * the rowgroup size. + */ + void set_rowgroup_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS(size_bytes >= 64 << 10, "64KB is the minimum rowgorup size"); + _rowgroup_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum rowgroup size, in rows. + */ + void set_rowgroup_size_rows(size_type size_rows) + { + CUDF_EXPECTS(size_rows >= 512, "Maximum rowgroup size cannot be smaller than 512"); + _rowgroup_size_rows = size_rows; + } + + /** + * @brief Sets the target size for each page. + */ + void set_page_size_bytes(size_type size_bytes) + { + CUDF_EXPECTS(size_bytes >= 512, "Page size cannot be smaller than 512"); + _page_size_bytes = size_bytes; + } }; class parquet_writer_options_builder { @@ -586,6 +637,42 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum rowgroup size, in bytes. + * + * @param val maximum rowgroup size + * @return this for chaining. + */ + parquet_writer_options_builder& rowgroup_size_bytes(size_t val) + { + options.set_rowgroup_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum number of rows in output rowgroups. + * + * @param val maximum number or rows + * @return this for chaining. + */ + parquet_writer_options_builder& stripe_size_rows(size_type val) + { + options.set_rowgroup_size_rows(val); + return *this; + } + + /** + * @brief Sets the target page size. + * + * @param val new target page size + * @return this for chaining. + */ + parquet_writer_options_builder& page_size_bytes(size_type val) + { + options.set_page_size_bytes(val); + return *this; + } + /** * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. * @@ -664,6 +751,12 @@ class chunked_parquet_writer_options { // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; + // Maximum size of each rowgroup (unless smaller than a single page) + size_t _rowgroup_size_bytes = default_rowgroup_size_bytes; + // Maximum number of rows in rowgroup (unless smaller than a single page) + size_type _rowgroup_size_rows = default_rowgroup_size_rows; + // Target size of each page + size_t _page_size_bytes = default_page_size_bytes; /** * @brief Constructor from sink. @@ -707,6 +800,21 @@ class chunked_parquet_writer_options { */ bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + /** + * @brief Returns maximum rowgroup size, in bytes. + */ + auto rowgroup_size_bytes() const { return _rowgroup_size_bytes; } + + /** + * @brief Returns maximum rowgroup size, in rows. + */ + auto rowgroup_size_rows() const { return _rowgroup_size_rows; } + + /** + * @brief Returns the target page size, in bytes. + */ + auto page_size_bytes() const { return std::min(_page_size_bytes, rowgroup_size_bytes()); } + /** * @brief Sets metadata. * @@ -736,6 +844,36 @@ class chunked_parquet_writer_options { */ void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + /** + * @brief Sets the maximum rowgroup size, in bytes. + * + * If the rowgroup size is smaller that the page size, page size will be reduced to match + * the rowgroup size. + */ + void set_rowgroup_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS(size_bytes >= 64 << 10, "64KB is the minimum rowgorup size"); + _rowgroup_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum rowgroup size, in rows. + */ + void set_rowgroup_size_rows(size_type size_rows) + { + CUDF_EXPECTS(size_rows >= 512, "Maximum rowgroup size cannot be smaller than 512"); + _rowgroup_size_rows = size_rows; + } + + /** + * @brief Sets the target size for each page. + */ + void set_page_size_bytes(size_type size_bytes) + { + CUDF_EXPECTS(size_bytes >= 512, "Page size cannot be smaller than 512"); + _page_size_bytes = size_bytes; + } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -815,6 +953,42 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum rowgroup size, in bytes. + * + * @param val maximum rowgroup size + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& rowgroup_size_bytes(size_t val) + { + options.set_rowgroup_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum number of rows in output rowgroups. + * + * @param val maximum number or rows + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& stripe_size_rows(size_type val) + { + options.set_rowgroup_size_rows(val); + return *this; + } + + /** + * @brief Sets the target page size. + * + * @param val new target page size + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& page_size_bytes(size_type val) + { + options.set_page_size_bytes(val); + return *this; + } + /** * @brief move chunked_parquet_writer_options member once it's built. */ From d67f5bfe77919b3e4afd3d052201829f50355f04 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 13:31:30 -0800 Subject: [PATCH 03/16] rename APIs to match other getters --- cpp/include/cudf/io/orc.hpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 2a95b85465b..1174d3289fc 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -445,24 +445,24 @@ class orc_writer_options { /** * @brief Whether writing column statistics is enabled/disabled. */ - bool enable_statistics() const { return _enable_statistics; } + bool is_enabled_statistics() const { return _enable_statistics; } /** * @brief Returns maximum stripe size, in bytes. */ - auto stripe_size_bytes() const { return _stripe_size_bytes; } + auto get_stripe_size_bytes() const { return _stripe_size_bytes; } /** * @brief Returns maximum stripe size, in rows. */ - auto stripe_size_rows() const { return _stripe_size_rows; } + auto get_stripe_size_rows() const { return _stripe_size_rows; } /** * @brief Returns the row index stride. */ - auto row_index_stride() const + auto get_row_index_stride() const { - auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows()); + auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows()); return unaligned_stride - unaligned_stride % 8; } @@ -739,24 +739,24 @@ class chunked_orc_writer_options { /** * @brief Whether writing column statistics is enabled/disabled. */ - bool enable_statistics() const { return _enable_statistics; } + bool is_enabled_statistics() const { return _enable_statistics; } /** * @brief Returns maximum stripe size, in bytes. */ - auto stripe_size_bytes() const { return _stripe_size_bytes; } + auto get_stripe_size_bytes() const { return _stripe_size_bytes; } /** * @brief Returns maximum stripe size, in rows. */ - auto stripe_size_rows() const { return _stripe_size_rows; } + auto get_stripe_size_rows() const { return _stripe_size_rows; } /** * @brief Returns the row index stride. */ - auto row_index_stride() const + auto get_row_index_stride() const { - auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows()); + auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows()); return unaligned_stride - unaligned_stride % 8; } From 0e16b424bbd026f1195978fafab82abe3374c428 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 13:33:30 -0800 Subject: [PATCH 04/16] fix naming; remove unused API --- cpp/include/cudf/io/parquet.hpp | 162 +++++++++-------------------- cpp/src/io/orc/writer_impl.cu | 12 +-- cpp/src/io/parquet/writer_impl.cu | 8 +- cpp/src/io/parquet/writer_impl.hpp | 5 +- 4 files changed, 65 insertions(+), 122 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 2dba68c1cfa..b53ba2f5a15 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -37,9 +37,8 @@ namespace io { * @file */ -constexpr size_t default_rowgroup_size_bytes = 128 * 1024 * 1024; // 128MB -constexpr size_type default_rowgroup_size_rows = 1000000; -constexpr size_t default_page_size_bytes = 512 * 1024; // 512KB +constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB +constexpr size_type default_row_group_size_rows = 1000000; /** * @brief Builds parquet_reader_options to use for `read_parquet()`. @@ -402,12 +401,10 @@ class parquet_writer_options { bool _write_timestamps_as_int96 = false; // Column chunks file path to be set in the raw output metadata std::string _column_chunks_file_path; - // Maximum size of each rowgroup (unless smaller than a single page) - size_t _rowgroup_size_bytes = default_rowgroup_size_bytes; - // Maximum number of rows in rowgroup (unless smaller than a single page) - size_type _rowgroup_size_rows = default_rowgroup_size_rows; - // Target size of each page - size_t _page_size_bytes = default_page_size_bytes; + // Maximum size of each row group (unless smaller than a single page) + size_t _row_group_size_bytes = default_row_group_size_bytes; + // Maximum number of rows in row group (unless smaller than a single page) + size_type _row_group_size_rows = default_row_group_size_rows; /** * @brief Constructor from sink and table. @@ -483,19 +480,14 @@ class parquet_writer_options { std::string get_column_chunks_file_path() const { return _column_chunks_file_path; } /** - * @brief Returns maximum rowgroup size, in bytes. + * @brief Returns maximum row group size, in bytes. */ - auto rowgroup_size_bytes() const { return _rowgroup_size_bytes; } + auto get_row_group_size_bytes() const { return _row_group_size_bytes; } /** - * @brief Returns maximum rowgroup size, in rows. + * @brief Returns maximum rowngroup size, in rows. */ - auto rowgroup_size_rows() const { return _rowgroup_size_rows; } - - /** - * @brief Returns the target page size, in bytes. - */ - auto page_size_bytes() const { return std::min(_page_size_bytes, rowgroup_size_bytes()); } + auto get_row_group_size_rows() const { return _row_group_size_rows; } /** * @brief Sets metadata. @@ -537,33 +529,24 @@ class parquet_writer_options { } /** - * @brief Sets the maximum rowgroup size, in bytes. + * @brief Sets the maximum row group size, in bytes. * - * If the rowgroup size is smaller that the page size, page size will be reduced to match - * the rowgroup size. + * If the row group size is smaller that the page size, page size will be reduced to match + * the row_group size. */ - void set_rowgroup_size_bytes(size_t size_bytes) + void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_bytes >= 64 << 10, "64KB is the minimum rowgorup size"); - _rowgroup_size_bytes = size_bytes; + CUDF_EXPECTS(size_bytes >= 512 << 10, "512KB is the minimum rowgorup size"); + _row_group_size_bytes = size_bytes; } /** - * @brief Sets the maximum rowgroup size, in rows. + * @brief Sets the maximum row group size, in rows. */ - void set_rowgroup_size_rows(size_type size_rows) + void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_rows >= 512, "Maximum rowgroup size cannot be smaller than 512"); - _rowgroup_size_rows = size_rows; - } - - /** - * @brief Sets the target size for each page. - */ - void set_page_size_bytes(size_type size_bytes) - { - CUDF_EXPECTS(size_bytes >= 512, "Page size cannot be smaller than 512"); - _page_size_bytes = size_bytes; + CUDF_EXPECTS(size_rows >= 5000, "Maximum row group size cannot be smaller than 5000"); + _row_group_size_rows = size_rows; } }; @@ -638,38 +621,26 @@ class parquet_writer_options_builder { } /** - * @brief Sets the maximum rowgroup size, in bytes. + * @brief Sets the maximum row group size, in bytes. * - * @param val maximum rowgroup size + * @param val maximum row group size * @return this for chaining. */ - parquet_writer_options_builder& rowgroup_size_bytes(size_t val) + parquet_writer_options_builder& row_group_size_bytes(size_t val) { - options.set_rowgroup_size_bytes(val); + options.set_row_group_size_bytes(val); return *this; } /** - * @brief Sets the maximum number of rows in output rowgroups. + * @brief Sets the maximum number of rows in output row groups. * * @param val maximum number or rows * @return this for chaining. */ - parquet_writer_options_builder& stripe_size_rows(size_type val) + parquet_writer_options_builder& row_group_size_rows(size_type val) { - options.set_rowgroup_size_rows(val); - return *this; - } - - /** - * @brief Sets the target page size. - * - * @param val new target page size - * @return this for chaining. - */ - parquet_writer_options_builder& page_size_bytes(size_type val) - { - options.set_page_size_bytes(val); + options.set_row_group_size_rows(val); return *this; } @@ -728,7 +699,7 @@ std::unique_ptr> write_parquet( * @param[in] metadata_list List of input file metadata. * @return A parquet-compatible blob that contains the data for all row groups in the list. */ -std::unique_ptr> merge_rowgroup_metadata( +std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list); /** @@ -751,12 +722,10 @@ class chunked_parquet_writer_options { // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; - // Maximum size of each rowgroup (unless smaller than a single page) - size_t _rowgroup_size_bytes = default_rowgroup_size_bytes; - // Maximum number of rows in rowgroup (unless smaller than a single page) - size_type _rowgroup_size_rows = default_rowgroup_size_rows; - // Target size of each page - size_t _page_size_bytes = default_page_size_bytes; + // Maximum size of each row group (unless smaller than a single page) + size_t _row_group_size_bytes = default_row_group_size_bytes; + // Maximum number of rows in row group (unless smaller than a single page) + size_type _row_group_size_rows = default_row_group_size_rows; /** * @brief Constructor from sink. @@ -801,19 +770,14 @@ class chunked_parquet_writer_options { bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } /** - * @brief Returns maximum rowgroup size, in bytes. + * @brief Returns maximum row group size, in bytes. */ - auto rowgroup_size_bytes() const { return _rowgroup_size_bytes; } + auto get_row_group_size_bytes() const { return _row_group_size_bytes; } /** - * @brief Returns maximum rowgroup size, in rows. + * @brief Returns maximum row group size, in rows. */ - auto rowgroup_size_rows() const { return _rowgroup_size_rows; } - - /** - * @brief Returns the target page size, in bytes. - */ - auto page_size_bytes() const { return std::min(_page_size_bytes, rowgroup_size_bytes()); } + auto get_row_group_size_rows() const { return _row_group_size_rows; } /** * @brief Sets metadata. @@ -845,33 +809,21 @@ class chunked_parquet_writer_options { void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } /** - * @brief Sets the maximum rowgroup size, in bytes. - * - * If the rowgroup size is smaller that the page size, page size will be reduced to match - * the rowgroup size. - */ - void set_rowgroup_size_bytes(size_t size_bytes) - { - CUDF_EXPECTS(size_bytes >= 64 << 10, "64KB is the minimum rowgorup size"); - _rowgroup_size_bytes = size_bytes; - } - - /** - * @brief Sets the maximum rowgroup size, in rows. + * @brief Sets the maximum row group size, in bytes. */ - void set_rowgroup_size_rows(size_type size_rows) + void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_rows >= 512, "Maximum rowgroup size cannot be smaller than 512"); - _rowgroup_size_rows = size_rows; + CUDF_EXPECTS(size_bytes >= 512 << 10, "512KB is the minimum rowgorup size"); + _row_group_size_bytes = size_bytes; } /** - * @brief Sets the target size for each page. + * @brief Sets the maximum row group size, in rows. */ - void set_page_size_bytes(size_type size_bytes) + void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_bytes >= 512, "Page size cannot be smaller than 512"); - _page_size_bytes = size_bytes; + CUDF_EXPECTS(size_rows >= 5000, "Maximum row group size cannot be smaller than 5000"); + _row_group_size_rows = size_rows; } /** @@ -954,38 +906,26 @@ class chunked_parquet_writer_options_builder { } /** - * @brief Sets the maximum rowgroup size, in bytes. + * @brief Sets the maximum row group size, in bytes. * - * @param val maximum rowgroup size + * @param val maximum row group size * @return this for chaining. */ - chunked_parquet_writer_options_builder& rowgroup_size_bytes(size_t val) + chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val) { - options.set_rowgroup_size_bytes(val); + options.set_row_group_size_bytes(val); return *this; } /** - * @brief Sets the maximum number of rows in output rowgroups. + * @brief Sets the maximum number of rows in output row groups. * * @param val maximum number or rows * @return this for chaining. */ - chunked_parquet_writer_options_builder& stripe_size_rows(size_type val) - { - options.set_rowgroup_size_rows(val); - return *this; - } - - /** - * @brief Sets the target page size. - * - * @param val new target page size - * @return this for chaining. - */ - chunked_parquet_writer_options_builder& page_size_bytes(size_type val) + chunked_parquet_writer_options_builder& row_group_size_rows(size_type val) { - options.set_page_size_bytes(val); + options.set_row_group_size_rows(val); return *this; } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 1e580e360ca..d377f55f741 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1296,10 +1296,10 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), - max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()}, - row_index_stride{options.row_index_stride()}, + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.enable_statistics()), + enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), out_sink_(std::move(sink)) { @@ -1316,10 +1316,10 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), - max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()}, - row_index_stride{options.row_index_stride()}, + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.enable_statistics()), + enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), out_sink_(std::move(sink)) { diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index ac9f4d4b263..81775bcbdc4 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1024,6 +1024,8 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + max_row_group_size{options.get_row_group_size_bytes()}, + max_row_group_rows{options.get_row_group_size_rows()}, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), @@ -1043,6 +1045,8 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + max_row_group_size{options.get_row_group_size_bytes()}, + max_row_group_rows{options.get_row_group_size_rows()}, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), @@ -1172,8 +1176,8 @@ void writer::impl::write(table_view const& table) fragment_data_size += fragments[i][f].fragment_data_size; } if (f > rowgroup_start && - (rowgroup_size + fragment_data_size > max_rowgroup_size || - (f + 1 - rowgroup_start) * max_page_fragment_size > max_rowgroup_rows)) { + (rowgroup_size + fragment_data_size > max_row_group_size || + (f + 1 - rowgroup_start) * max_page_fragment_size > max_row_group_rows)) { // update schema md.row_groups.resize(md.row_groups.size() + 1); md.row_groups[global_r++].num_rows = (f - rowgroup_start) * max_page_fragment_size; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 20a9b77625f..9188218f607 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -202,9 +202,8 @@ class writer::impl { // Cuda stream to be used rmm::cuda_stream_view stream = rmm::cuda_stream_default; - size_t max_rowgroup_size = default_rowgroup_size_bytes; - size_type max_rowgroup_rows = default_rowgroup_size_rows; - size_t target_page_size = default_page_size_bytes; + size_t max_row_group_size = default_row_group_size_bytes; + size_type max_row_group_rows = default_row_group_size_rows; Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; From c3a3420a8a108b6937b8be8b125fa5c0eb984e65 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 15:57:54 -0800 Subject: [PATCH 05/16] rename merge_rowgroup_metadata --- cpp/include/cudf/io/detail/parquet.hpp | 2 +- cpp/src/io/functions.cpp | 6 +++--- cpp/src/io/parquet/writer_impl.cu | 2 +- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 2 +- python/cudf/cudf/_lib/parquet.pyx | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 14f27ef8eef..98922ad10a4 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -148,7 +148,7 @@ class writer { * @param[in] metadata_list List of input file metadata * @return A parquet-compatible blob that contains the data for all rowgroups in the list */ - static std::unique_ptr> merge_rowgroup_metadata( + static std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list); }; diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index b678941db21..a8ca1d3a459 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -405,13 +405,13 @@ table_with_metadata read_parquet(parquet_reader_options const& options, } /** - * @copydoc cudf::io::merge_rowgroup_metadata + * @copydoc cudf::io::merge_row_group_metadata */ -std::unique_ptr> merge_rowgroup_metadata( +std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list) { CUDF_FUNC_RANGE(); - return detail_parquet::writer::merge_rowgroup_metadata(metadata_list); + return detail_parquet::writer::merge_row_group_metadata(metadata_list); } table_input_metadata::table_input_metadata(table_view const& table, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 81775bcbdc4..4c22f3d0590 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1516,7 +1516,7 @@ std::unique_ptr> writer::close(std::string const& column_ch return _impl->close(column_chunks_file_path); } -std::unique_ptr> writer::merge_rowgroup_metadata( +std::unique_ptr> writer::merge_row_group_metadata( const std::vector>>& metadata_list) { std::vector output; diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 81ca7e5836b..6156564c0df 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -173,6 +173,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: string column_chunks_file_path, ) except+ - cdef unique_ptr[vector[uint8_t]] merge_rowgroup_metadata( + cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( const vector[unique_ptr[vector[uint8_t]]]& metadata_list ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 9c24e5becfd..62a2713a6c7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -45,7 +45,7 @@ from cudf._lib.column cimport Column from cudf._lib.cpp.io.parquet cimport ( chunked_parquet_writer_options, chunked_parquet_writer_options_builder, - merge_rowgroup_metadata as parquet_merge_metadata, + merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, parquet_reader_options, parquet_writer_options, @@ -479,11 +479,11 @@ cdef class ParquetWriter: cpdef merge_filemetadata(object filemetadata_list): """ - Cython function to call into libcudf API, see `merge_rowgroup_metadata`. + Cython function to call into libcudf API, see `merge_row_group_metadata`. See Also -------- - cudf.io.parquet.merge_rowgroup_metadata + cudf.io.parquet.merge_row_group_metadata """ cdef vector[unique_ptr[vector[uint8_t]]] list_c cdef vector[uint8_t] blob_c From e3ae90952568df3474bb3662446c4591409c4a69 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 15:58:08 -0800 Subject: [PATCH 06/16] negative C++ test --- cpp/tests/io/parquet_test.cpp | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 3bae8d7ab1e..db126d19bb5 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3056,4 +3056,26 @@ TEST_F(ParquetReaderTest, EmptyOutput) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, StripeSizeInvalid) +{ + const auto unused_table = std::make_unique(); + std::vector out_buffer; + + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_rows(4999), + cudf::logic_error); + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_bytes(511 << 10), + cudf::logic_error); + + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_rows(4999), + cudf::logic_error); + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_bytes(511 << 10), + cudf::logic_error); +} + CUDF_TEST_PROGRAM_MAIN() From eff9cbd9938b2ca900c1df93b78748e7e433b177 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 17:16:29 -0800 Subject: [PATCH 07/16] Python API --- python/cudf/cudf/_lib/cpp/io/orc.pxd | 4 ++-- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 20 +++++++++++++++++ python/cudf/cudf/_lib/parquet.pyx | 28 ++++++++++++++---------- python/cudf/cudf/io/parquet.py | 6 +++++ 4 files changed, 45 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/io/orc.pxd b/python/cudf/cudf/_lib/cpp/io/orc.pxd index c855f112692..c1325d43539 100644 --- a/python/cudf/cudf/_lib/cpp/io/orc.pxd +++ b/python/cudf/cudf/_lib/cpp/io/orc.pxd @@ -70,8 +70,8 @@ cdef extern from "cudf/io/orc.hpp" \ cudf_io_types.compression_type get_compression() except+ bool enable_statistics() except+ size_t stripe_size_bytes() except+ - size_type stripe_size_rows() except+ - size_type row_index_stride() except+ + size_type get_stripe_size_rows() except+ + size_type get_row_index_stride() except+ cudf_table_view.table_view get_table() except+ const cudf_io_types.table_input_metadata *get_metadata() except+ diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 6156564c0df..9d95dce83bc 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -74,6 +74,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view get_table() except + const cudf_io_types.table_input_metadata get_metadata() except + string get_column_chunks_file_path() except+ + size_t get_row_group_size_bytes() except+ + size_type get_row_group_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m @@ -87,6 +89,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_column_chunks_file_path( string column_chunks_file_path ) except + + void set_row_group_size_bytes(size_t val) except+ + void set_row_group_size_rows(size_type val) except+ @staticmethod parquet_writer_options_builder builder( @@ -116,6 +120,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& int96_timestamps( bool enabled ) except + + parquet_writer_options_builder& row_group_size_bytes( + size_t val + ) except+ + parquet_writer_options_builder& row_group_size_rows( + size_type val + ) except+ parquet_writer_options build() except + @@ -130,6 +140,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.statistics_freq get_stats_level() except + cudf_io_types.table_input_metadata* get_metadata( ) except+ + size_t get_row_group_size_bytes() except+ + size_type get_row_group_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m @@ -140,6 +152,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + + void set_row_group_size_bytes(size_t val) except+ + void set_row_group_size_rows(size_type val) except+ @staticmethod chunked_parquet_writer_options_builder builder( @@ -160,6 +174,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& compression( cudf_io_types.compression_type compression ) except + + parquet_writer_options_builder& row_group_size_bytes( + size_t val + ) except+ + parquet_writer_options_builder& row_group_size_rows( + size_type val + ) except+ chunked_parquet_writer_options build() except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 62a2713a6c7..945e6524e16 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -278,7 +278,9 @@ cpdef write_parquet( object compression="snappy", object statistics="ROWGROUP", object metadata_file_path=None, - object int96_timestamps=False): + object int96_timestamps=False, + object row_group_size_bytes=None, + object row_group_size_rows=None): """ Cython function to call into libcudf API, see `write_parquet`. @@ -330,7 +332,6 @@ cpdef write_parquet( cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics) - cdef parquet_writer_options args cdef unique_ptr[vector[uint8_t]] out_metadata_c cdef string c_column_chunks_file_path cdef bool _int96_timestamps = int96_timestamps @@ -338,16 +339,21 @@ cpdef write_parquet( c_column_chunks_file_path = str.encode(metadata_file_path) # Perform write + cdef parquet_writer_options args = move( + parquet_writer_options.builder(sink, tv) + .metadata(tbl_meta.get()) + .compression(comp_type) + .stats_level(stat_freq) + .column_chunks_file_path(c_column_chunks_file_path) + .int96_timestamps(_int96_timestamps) + .build() + ) + if row_group_size_bytes is not None: + args.set_row_group_size_bytes(row_group_size_bytes) + if row_group_size_rows is not None: + args.set_row_group_size_rows(row_group_size_rows) + with nogil: - args = move( - parquet_writer_options.builder(sink, tv) - .metadata(tbl_meta.get()) - .compression(comp_type) - .stats_level(stat_freq) - .column_chunks_file_path(c_column_chunks_file_path) - .int96_timestamps(_int96_timestamps) - .build() - ) out_metadata_c = move(parquet_writer(args)) if metadata_file_path is not None: diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a60ec07b894..35a0241cc48 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -419,6 +419,8 @@ def to_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, + row_group_size_bytes=None, + row_group_size_rows=None, *args, **kwargs, ): @@ -458,6 +460,8 @@ def to_parquet( statistics=statistics, metadata_file_path=metadata_file_path, int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, ) else: write_parquet_res = libparquet.write_parquet( @@ -468,6 +472,8 @@ def to_parquet( statistics=statistics, metadata_file_path=metadata_file_path, int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, ) return write_parquet_res From 86991740a47dfa36d5a14b495dcac550feff1e09 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 11 Nov 2021 17:16:46 -0800 Subject: [PATCH 08/16] Python test --- python/cudf/cudf/tests/test_parquet.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9629e502584..e17277157f9 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -233,6 +233,11 @@ def _make_parquet_path_or_buf(src): yield _make_parquet_path_or_buf +@pytest.fixture(scope="module") +def large_int64_gdf(): + return cudf.DataFrame.from_pandas(pd.DataFrame({"col": range(0, 1000000)})) + + @pytest.mark.filterwarnings("ignore:Using CPU") @pytest.mark.parametrize("engine", ["pyarrow", "cudf"]) @pytest.mark.parametrize( @@ -2139,3 +2144,19 @@ def test_parquet_decimal_precision_empty(tmpdir): df.to_parquet(fname) df = cudf.read_parquet(fname) assert df.val.dtype.precision == 5 + + +@pytest.mark.parametrize("size_bytes", [4 << 20, 1 << 20, 512 << 10]) +@pytest.mark.parametrize("size_rows", [1000000, 100000, 10000]) +def test_parquet_writer_row_group_size( + tmpdir, large_int64_gdf, size_bytes, size_rows +): + fname = tmpdir.join("row_group_size.parquet") + large_int64_gdf.to_parquet( + fname, row_group_size_bytes=size_bytes, row_group_size_rows=size_rows + ) + + num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + # 8 bytes per row, as the column is int64 + expected_num_rows = max(num_rows / size_rows, 8 * 1024 * 1024 / size_bytes) + assert expected_num_rows == row_groups From 5c2c59b24a3bce78ac778034ef5ee690621851b9 Mon Sep 17 00:00:00 2001 From: vuule Date: Fri, 12 Nov 2021 15:44:47 -0800 Subject: [PATCH 09/16] clean up --- cpp/include/cudf/io/parquet.hpp | 5 +---- cpp/tests/io/parquet_test.cpp | 2 +- python/cudf/cudf/tests/test_parquet.py | 5 +++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index b53ba2f5a15..15a31b3b442 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -485,7 +485,7 @@ class parquet_writer_options { auto get_row_group_size_bytes() const { return _row_group_size_bytes; } /** - * @brief Returns maximum rowngroup size, in rows. + * @brief Returns maximum row group size, in rows. */ auto get_row_group_size_rows() const { return _row_group_size_rows; } @@ -530,9 +530,6 @@ class parquet_writer_options { /** * @brief Sets the maximum row group size, in bytes. - * - * If the row group size is smaller that the page size, page size will be reduced to match - * the row_group size. */ void set_row_group_size_bytes(size_t size_bytes) { diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index db126d19bb5..b233819092a 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3056,7 +3056,7 @@ TEST_F(ParquetReaderTest, EmptyOutput) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } -TEST_F(ParquetWriterTest, StripeSizeInvalid) +TEST_F(ParquetWriterTest, RowGroupSizeInvalid) { const auto unused_table = std::make_unique
(); std::vector out_buffer; diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index eddd0042334..bd570556087 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -233,7 +233,7 @@ def _make_parquet_path_or_buf(src): @pytest.fixture(scope="module") def large_int64_gdf(): - return cudf.DataFrame.from_pandas(pd.DataFrame({"col": range(0, 1000000)})) + return cudf.DataFrame.from_pandas(pd.DataFrame({"col": range(0, 1 << 20)})) @pytest.mark.filterwarnings("ignore:Using CPU") @@ -2146,6 +2146,7 @@ def test_parquet_reader_brotli(datadir): assert_eq(expect, got) + @pytest.mark.parametrize("size_bytes", [4 << 20, 1 << 20, 512 << 10]) @pytest.mark.parametrize("size_rows", [1000000, 100000, 10000]) def test_parquet_writer_row_group_size( @@ -2158,5 +2159,5 @@ def test_parquet_writer_row_group_size( num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) # 8 bytes per row, as the column is int64 - expected_num_rows = max(num_rows / size_rows, 8 * 1024 * 1024 / size_bytes) + expected_num_rows = max(num_rows / size_rows, 8 * num_rows / size_bytes) assert expected_num_rows == row_groups From 746224147feacf7580cd725909474b034473c2cb Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 15 Nov 2021 10:33:36 -0800 Subject: [PATCH 10/16] Apply suggestions from code review Co-authored-by: Bradley Dice --- cpp/include/cudf/io/parquet.hpp | 8 ++++---- cpp/src/io/parquet/writer_impl.cu | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 15a31b3b442..86b3a8a4569 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -533,7 +533,7 @@ class parquet_writer_options { */ void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_bytes >= 512 << 10, "512KB is the minimum rowgorup size"); + CUDF_EXPECTS(size_bytes >= 512 * 1024, "The maximum row group size must be at least 512KB."); _row_group_size_bytes = size_bytes; } @@ -542,7 +542,7 @@ class parquet_writer_options { */ void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_rows >= 5000, "Maximum row group size cannot be smaller than 5000"); + CUDF_EXPECTS(size_rows >= 5000, "The maximum row group size must be at least 5000 rows."); _row_group_size_rows = size_rows; } }; @@ -810,7 +810,7 @@ class chunked_parquet_writer_options { */ void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_bytes >= 512 << 10, "512KB is the minimum rowgorup size"); + CUDF_EXPECTS(size_bytes >= 512 * 1024, "The maximum row group size must be at least 512KB."); _row_group_size_bytes = size_bytes; } @@ -819,7 +819,7 @@ class chunked_parquet_writer_options { */ void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_rows >= 5000, "Maximum row group size cannot be smaller than 5000"); + CUDF_EXPECTS(size_rows >= 5000, "The maximum row group size must be at least 5000 rows."); _row_group_size_rows = size_rows; } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 4c22f3d0590..b4e11806049 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1277,7 +1277,7 @@ void writer::impl::write(table_view const& table) // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; size_type num_pages = 0; - size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this + size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TODO: Tune this size_t max_uncomp_bfr_size = 0; size_t max_comp_bfr_size = 0; size_t max_chunk_bfr_size = 0; @@ -1336,7 +1336,7 @@ void writer::impl::write(table_view const& table) uint8_t* bfr = static_cast(uncomp_bfr.data()); uint8_t* bfr_c = static_cast(comp_bfr.data()); for (size_type j = 0; j < batch_list[b]; j++, r++) { - for (int i = 0; i < num_columns; i++) { + for (size_type i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; ck->uncompressed_bfr = bfr; ck->compressed_bfr = bfr_c; From 9b4fc672ffac5744a654a7f1d28e8efcd8cf5d0f Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 15 Nov 2021 10:38:57 -0800 Subject: [PATCH 11/16] missed renames Co-authored-by: Bradley Dice --- python/cudf/cudf/_lib/cpp/io/orc.pxd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/cpp/io/orc.pxd b/python/cudf/cudf/_lib/cpp/io/orc.pxd index c1325d43539..10f1344c908 100644 --- a/python/cudf/cudf/_lib/cpp/io/orc.pxd +++ b/python/cudf/cudf/_lib/cpp/io/orc.pxd @@ -68,8 +68,8 @@ cdef extern from "cudf/io/orc.hpp" \ orc_writer_options() cudf_io_types.sink_info get_sink() except+ cudf_io_types.compression_type get_compression() except+ - bool enable_statistics() except+ - size_t stripe_size_bytes() except+ + bool is_enabled_statistics() except+ + size_t get_stripe_size_bytes() except+ size_type get_stripe_size_rows() except+ size_type get_row_index_stride() except+ cudf_table_view.table_view get_table() except+ From 61d4e6a9fca3c46afc420a29128af431ccf85996 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 Nov 2021 11:29:04 -0800 Subject: [PATCH 12/16] clarify that the minimum row group size is based on page size --- cpp/include/cudf/io/parquet.hpp | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 86b3a8a4569..88cf7416506 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -533,7 +533,9 @@ class parquet_writer_options { */ void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_bytes >= 512 * 1024, "The maximum row group size must be at least 512KB."); + CUDF_EXPECTS( + size_bytes >= 512 * 1024, + "The maximum row group size cannot be smaller than the page size, which is 512KB."); _row_group_size_bytes = size_bytes; } @@ -542,7 +544,9 @@ class parquet_writer_options { */ void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_rows >= 5000, "The maximum row group size must be at least 5000 rows."); + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); _row_group_size_rows = size_rows; } }; @@ -810,7 +814,9 @@ class chunked_parquet_writer_options { */ void set_row_group_size_bytes(size_t size_bytes) { - CUDF_EXPECTS(size_bytes >= 512 * 1024, "The maximum row group size must be at least 512KB."); + CUDF_EXPECTS( + size_bytes >= 512 * 1024, + "The maximum row group size cannot be smaller than the page size, which is 512KB."); _row_group_size_bytes = size_bytes; } @@ -819,7 +825,9 @@ class chunked_parquet_writer_options { */ void set_row_group_size_rows(size_type size_rows) { - CUDF_EXPECTS(size_rows >= 5000, "The maximum row group size must be at least 5000 rows."); + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); _row_group_size_rows = size_rows; } From 2404be869927effc7c1f377e5a85a4bb5652533e Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 15 Nov 2021 15:03:19 -0800 Subject: [PATCH 13/16] review suggestion Co-authored-by: Yunsong Wang --- cpp/src/io/parquet/writer_impl.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index b4e11806049..4d24de3037a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1209,7 +1209,7 @@ void writer::impl::write(table_view const& table) hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); for (size_type r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; r++, global_r++) { - size_type fragments_in_chunk = + size_type const fragments_in_chunk = (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size; md.row_groups[global_r].total_byte_size = 0; md.row_groups[global_r].columns.resize(num_columns); From cbc8ecdd6ecbef2048b0de8e34ee13ab71576394 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 Nov 2021 15:51:55 -0800 Subject: [PATCH 14/16] auto --- cpp/src/io/parquet/writer_impl.cu | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 4d24de3037a..15c1302b2a8 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1166,11 +1166,10 @@ void writer::impl::write(table_view const& table) auto const global_rowgroup_base = static_cast(md.row_groups.size()); // Decide row group boundaries based on uncompressed data size - size_t rowgroup_size = 0; - size_type num_rowgroups = 0; - for (size_type f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; - f++) { - size_t fragment_data_size = 0; + auto rowgroup_size = 0ul; + auto num_rowgroups = 0; + for (auto f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; f++) { + auto fragment_data_size = 0ul; // Replace with STL algorithm to transform and sum for (auto i = 0; i < num_columns; i++) { fragment_data_size += fragments[i][f].fragment_data_size; @@ -1207,13 +1206,13 @@ void writer::impl::write(table_view const& table) // Initialize row groups and column chunks auto const num_chunks = num_rowgroups * num_columns; hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); - for (size_type r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; + for (auto r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; r++, global_r++) { size_type const fragments_in_chunk = (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size; md.row_groups[global_r].total_byte_size = 0; md.row_groups[global_r].columns.resize(num_columns); - for (size_type i = 0; i < num_columns; i++) { + for (auto i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; *ck = {}; @@ -1245,8 +1244,8 @@ void writer::impl::write(table_view const& table) } auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, num_rows, stream); - for (size_type rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { - for (size_type col = 0; col < num_columns; col++) { + for (auto rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { + for (auto col = 0; col < num_columns; col++) { if (chunks.host_view()[rg][col].use_dictionary) { md.row_groups[global_rg].columns[col].meta_data.encodings.push_back( Encoding::PLAIN_DICTIONARY); @@ -1332,11 +1331,11 @@ void writer::impl::write(table_view const& table) // This contains stats for both the pages and the rowgroups. TODO: make them separate. rmm::device_uvector page_stats(num_stats_bfr, stream); - for (uint32_t b = 0, r = 0; b < (uint32_t)batch_list.size(); b++) { - uint8_t* bfr = static_cast(uncomp_bfr.data()); - uint8_t* bfr_c = static_cast(comp_bfr.data()); - for (size_type j = 0; j < batch_list[b]; j++, r++) { - for (size_type i = 0; i < num_columns; i++) { + for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + auto bfr = static_cast(uncomp_bfr.data()); + auto bfr_c = static_cast(comp_bfr.data()); + for (auto j = 0; j < batch_list[b]; j++, r++) { + for (auto i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; ck->uncompressed_bfr = bfr; ck->compressed_bfr = bfr_c; @@ -1361,7 +1360,7 @@ void writer::impl::write(table_view const& table) pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches - for (size_type b = 0, r = 0, global_r = global_rowgroup_base; + for (auto b = 0, r = 0, global_r = global_rowgroup_base; b < static_cast(batch_list.size()); b++) { // Count pages in this batch From de502cfc83103269e43547f3b95dcd3356f52939 Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 Nov 2021 23:27:52 -0800 Subject: [PATCH 15/16] test fix --- python/cudf/cudf/tests/test_parquet.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index bd570556087..ad3d1dca0fb 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2147,8 +2147,8 @@ def test_parquet_reader_brotli(datadir): assert_eq(expect, got) -@pytest.mark.parametrize("size_bytes", [4 << 20, 1 << 20, 512 << 10]) -@pytest.mark.parametrize("size_rows", [1000000, 100000, 10000]) +@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000]) +@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000]) def test_parquet_writer_row_group_size( tmpdir, large_int64_gdf, size_bytes, size_rows ): @@ -2159,5 +2159,7 @@ def test_parquet_writer_row_group_size( num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) # 8 bytes per row, as the column is int64 - expected_num_rows = max(num_rows / size_rows, 8 * num_rows / size_bytes) + expected_num_rows = max( + math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes) + ) assert expected_num_rows == row_groups From a9e04f7833174348fab2ab84a36f6bf7ed9e6225 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 16 Nov 2021 12:01:33 -0800 Subject: [PATCH 16/16] py docs --- python/cudf/cudf/utils/ioutils.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 6746753249c..11994830fed 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -221,6 +221,12 @@ timestamp[us] to the int96 format, which is the number of Julian days and the number of nanoseconds since midnight. If ``False``, timestamps will not be altered. +row_group_size_bytes: integer or None, default None + Maximum size of each stripe of the output. + If None, 13369344 (128MB) will be used. +row_group_size_rows: integer or None, default None + Maximum number of rows of each stripe of the output. + If None, 1000000 will be used. See Also @@ -404,10 +410,10 @@ stripe_size_bytes: integer or None, default None Maximum size of each stripe of the output. If None, 67108864 (64MB) will be used. -stripe_size_rows: integer or None, default None 1000000 +stripe_size_rows: integer or None, default None Maximum number of rows of each stripe of the output. If None, 1000000 will be used. -row_index_stride: integer or None, default None 10000 +row_index_stride: integer or None, default None Row index stride (maximum number of rows in each row group). If None, 10000 will be used.