diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 14f27ef8eef..98922ad10a4 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -148,7 +148,7 @@ class writer { * @param[in] metadata_list List of input file metadata * @return A parquet-compatible blob that contains the data for all rowgroups in the list */ - static std::unique_ptr> merge_rowgroup_metadata( + static std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list); }; diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index fb1199fc166..3bc2e6c9ef2 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -475,24 +475,24 @@ class orc_writer_options { /** * @brief Whether writing column statistics is enabled/disabled. */ - bool enable_statistics() const { return _enable_statistics; } + bool is_enabled_statistics() const { return _enable_statistics; } /** * @brief Returns maximum stripe size, in bytes. */ - auto stripe_size_bytes() const { return _stripe_size_bytes; } + auto get_stripe_size_bytes() const { return _stripe_size_bytes; } /** * @brief Returns maximum stripe size, in rows. */ - auto stripe_size_rows() const { return _stripe_size_rows; } + auto get_stripe_size_rows() const { return _stripe_size_rows; } /** * @brief Returns the row index stride. */ - auto row_index_stride() const + auto get_row_index_stride() const { - auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows()); + auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows()); return unaligned_stride - unaligned_stride % 8; } @@ -769,24 +769,24 @@ class chunked_orc_writer_options { /** * @brief Whether writing column statistics is enabled/disabled. */ - bool enable_statistics() const { return _enable_statistics; } + bool is_enabled_statistics() const { return _enable_statistics; } /** * @brief Returns maximum stripe size, in bytes. */ - auto stripe_size_bytes() const { return _stripe_size_bytes; } + auto get_stripe_size_bytes() const { return _stripe_size_bytes; } /** * @brief Returns maximum stripe size, in rows. */ - auto stripe_size_rows() const { return _stripe_size_rows; } + auto get_stripe_size_rows() const { return _stripe_size_rows; } /** * @brief Returns the row index stride. */ - auto row_index_stride() const + auto get_row_index_stride() const { - auto const unaligned_stride = std::min(_row_index_stride, stripe_size_rows()); + auto const unaligned_stride = std::min(_row_index_stride, get_stripe_size_rows()); return unaligned_stride - unaligned_stride % 8; } diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 660ec051304..88cf7416506 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -37,6 +37,9 @@ namespace io { * @file */ +constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB +constexpr size_type default_row_group_size_rows = 1000000; + /** * @brief Builds parquet_reader_options to use for `read_parquet()`. */ @@ -398,6 +401,10 @@ class parquet_writer_options { bool _write_timestamps_as_int96 = false; // Column chunks file path to be set in the raw output metadata std::string _column_chunks_file_path; + // Maximum size of each row group (unless smaller than a single page) + size_t _row_group_size_bytes = default_row_group_size_bytes; + // Maximum number of rows in row group (unless smaller than a single page) + size_type _row_group_size_rows = default_row_group_size_rows; /** * @brief Constructor from sink and table. @@ -472,6 +479,16 @@ class parquet_writer_options { */ std::string get_column_chunks_file_path() const { return _column_chunks_file_path; } + /** + * @brief Returns maximum row group size, in bytes. + */ + auto get_row_group_size_bytes() const { return _row_group_size_bytes; } + + /** + * @brief Returns maximum row group size, in rows. + */ + auto get_row_group_size_rows() const { return _row_group_size_rows; } + /** * @brief Sets metadata. * @@ -510,6 +527,28 @@ class parquet_writer_options { { _column_chunks_file_path.assign(file_path); } + + /** + * @brief Sets the maximum row group size, in bytes. + */ + void set_row_group_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS( + size_bytes >= 512 * 1024, + "The maximum row group size cannot be smaller than the page size, which is 512KB."); + _row_group_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum row group size, in rows. + */ + void set_row_group_size_rows(size_type size_rows) + { + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); + _row_group_size_rows = size_rows; + } }; class parquet_writer_options_builder { @@ -582,6 +621,30 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum row group size, in bytes. + * + * @param val maximum row group size + * @return this for chaining. + */ + parquet_writer_options_builder& row_group_size_bytes(size_t val) + { + options.set_row_group_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum number of rows in output row groups. + * + * @param val maximum number or rows + * @return this for chaining. + */ + parquet_writer_options_builder& row_group_size_rows(size_type val) + { + options.set_row_group_size_rows(val); + return *this; + } + /** * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. * @@ -637,7 +700,7 @@ std::unique_ptr> write_parquet( * @param[in] metadata_list List of input file metadata. * @return A parquet-compatible blob that contains the data for all row groups in the list. */ -std::unique_ptr> merge_rowgroup_metadata( +std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list); /** @@ -660,6 +723,10 @@ class chunked_parquet_writer_options { // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; + // Maximum size of each row group (unless smaller than a single page) + size_t _row_group_size_bytes = default_row_group_size_bytes; + // Maximum number of rows in row group (unless smaller than a single page) + size_type _row_group_size_rows = default_row_group_size_rows; /** * @brief Constructor from sink. @@ -703,6 +770,16 @@ class chunked_parquet_writer_options { */ bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + /** + * @brief Returns maximum row group size, in bytes. + */ + auto get_row_group_size_bytes() const { return _row_group_size_bytes; } + + /** + * @brief Returns maximum row group size, in rows. + */ + auto get_row_group_size_rows() const { return _row_group_size_rows; } + /** * @brief Sets metadata. * @@ -732,6 +809,28 @@ class chunked_parquet_writer_options { */ void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + /** + * @brief Sets the maximum row group size, in bytes. + */ + void set_row_group_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS( + size_bytes >= 512 * 1024, + "The maximum row group size cannot be smaller than the page size, which is 512KB."); + _row_group_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum row group size, in rows. + */ + void set_row_group_size_rows(size_type size_rows) + { + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); + _row_group_size_rows = size_rows; + } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -811,6 +910,30 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum row group size, in bytes. + * + * @param val maximum row group size + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val) + { + options.set_row_group_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum number of rows in output row groups. + * + * @param val maximum number or rows + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& row_group_size_rows(size_type val) + { + options.set_row_group_size_rows(val); + return *this; + } + /** * @brief move chunked_parquet_writer_options member once it's built. */ diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index b678941db21..a8ca1d3a459 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -405,13 +405,13 @@ table_with_metadata read_parquet(parquet_reader_options const& options, } /** - * @copydoc cudf::io::merge_rowgroup_metadata + * @copydoc cudf::io::merge_row_group_metadata */ -std::unique_ptr> merge_rowgroup_metadata( +std::unique_ptr> merge_row_group_metadata( const std::vector>>& metadata_list) { CUDF_FUNC_RANGE(); - return detail_parquet::writer::merge_rowgroup_metadata(metadata_list); + return detail_parquet::writer::merge_row_group_metadata(metadata_list); } table_input_metadata::table_input_metadata(table_view const& table, diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 2bf020d08a2..1563e3e1fd7 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1301,10 +1301,10 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), - max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()}, - row_index_stride{options.row_index_stride()}, + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.enable_statistics()), + enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), out_sink_(std::move(sink)) { @@ -1321,10 +1321,10 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), - max_stripe_size{options.stripe_size_bytes(), options.stripe_size_rows()}, - row_index_stride{options.row_index_stride()}, + max_stripe_size{options.get_stripe_size_bytes(), options.get_stripe_size_rows()}, + row_index_stride{options.get_row_index_stride()}, compression_kind_(to_orc_compression(options.get_compression())), - enable_statistics_(options.enable_statistics()), + enable_statistics_(options.is_enabled_statistics()), single_write_mode(mode == SingleWriteMode::YES), out_sink_(std::move(sink)) { diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 2c7d745bb4c..62803432157 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1026,6 +1026,8 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + max_row_group_size{options.get_row_group_size_bytes()}, + max_row_group_rows{options.get_row_group_size_rows()}, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), @@ -1045,6 +1047,8 @@ writer::impl::impl(std::unique_ptr sink, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + max_row_group_size{options.get_row_group_size_bytes()}, + max_row_group_rows{options.get_row_group_size_rows()}, compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), @@ -1148,8 +1152,7 @@ void writer::impl::write(table_view const& table) // compression/decompression performance). using cudf::io::parquet::gpu::max_page_fragment_size; - uint32_t num_fragments = - (uint32_t)((num_rows + max_page_fragment_size - 1) / max_page_fragment_size); + size_type const num_fragments = (num_rows + max_page_fragment_size - 1) / max_page_fragment_size; cudf::detail::hostdevice_2dvector fragments( num_columns, num_fragments, stream); @@ -1162,21 +1165,20 @@ void writer::impl::write(table_view const& table) init_page_fragments(fragments, col_desc, num_rows, max_page_fragment_size); } - size_t global_rowgroup_base = md.row_groups.size(); + auto const global_rowgroup_base = static_cast(md.row_groups.size()); // Decide row group boundaries based on uncompressed data size - size_t rowgroup_size = 0; - uint32_t num_rowgroups = 0; - for (uint32_t f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; - f++) { - size_t fragment_data_size = 0; + auto rowgroup_size = 0ul; + auto num_rowgroups = 0; + for (auto f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; f++) { + auto fragment_data_size = 0ul; // Replace with STL algorithm to transform and sum for (auto i = 0; i < num_columns; i++) { fragment_data_size += fragments[i][f].fragment_data_size; } if (f > rowgroup_start && - (rowgroup_size + fragment_data_size > max_rowgroup_size_ || - (f + 1 - rowgroup_start) * max_page_fragment_size > max_rowgroup_rows_)) { + (rowgroup_size + fragment_data_size > max_row_group_size || + (f + 1 - rowgroup_start) * max_page_fragment_size > max_row_group_rows)) { // update schema md.row_groups.resize(md.row_groups.size() + 1); md.row_groups[global_r++].num_rows = (f - rowgroup_start) * max_page_fragment_size; @@ -1204,15 +1206,15 @@ void writer::impl::write(table_view const& table) } } // Initialize row groups and column chunks - uint32_t num_chunks = num_rowgroups * num_columns; + auto const num_chunks = num_rowgroups * num_columns; hostdevice_2dvector chunks(num_rowgroups, num_columns, stream); - for (uint32_t r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; + for (auto r = 0, global_r = global_rowgroup_base, f = 0, start_row = 0; r < num_rowgroups; r++, global_r++) { - uint32_t fragments_in_chunk = (uint32_t)( - (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size); + size_type const fragments_in_chunk = + (md.row_groups[global_r].num_rows + max_page_fragment_size - 1) / max_page_fragment_size; md.row_groups[global_r].total_byte_size = 0; md.row_groups[global_r].columns.resize(num_columns); - for (int i = 0; i < num_columns; i++) { + for (auto i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; *ck = {}; @@ -1244,8 +1246,8 @@ void writer::impl::write(table_view const& table) } auto dict_info_owner = build_chunk_dictionaries(chunks, col_desc, num_rows, stream); - for (uint32_t rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { - for (int col = 0; col < num_columns; col++) { + for (auto rg = 0, global_rg = global_rowgroup_base; rg < num_rowgroups; rg++, global_rg++) { + for (auto col = 0; col < num_columns; col++) { if (chunks.host_view()[rg][col].use_dictionary) { md.row_groups[global_rg].columns[col].meta_data.encodings.push_back( Encoding::PLAIN_DICTIONARY); @@ -1274,16 +1276,16 @@ void writer::impl::write(table_view const& table) } // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) - std::vector batch_list; - uint32_t num_pages = 0; - size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TBD: Tune this - size_t max_uncomp_bfr_size = 0; - size_t max_comp_bfr_size = 0; - size_t max_chunk_bfr_size = 0; - uint32_t max_pages_in_batch = 0; - size_t bytes_in_batch = 0; - size_t comp_bytes_in_batch = 0; - for (uint32_t r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { + std::vector batch_list; + size_type num_pages = 0; + size_t max_bytes_in_batch = 1024 * 1024 * 1024; // 1GB - TODO: Tune this + size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; + size_t max_chunk_bfr_size = 0; + size_type max_pages_in_batch = 0; + size_t bytes_in_batch = 0; + size_t comp_bytes_in_batch = 0; + for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { size_t rowgroup_size = 0; size_t comp_rowgroup_size = 0; if (r < num_rowgroups) { @@ -1331,11 +1333,11 @@ void writer::impl::write(table_view const& table) // This contains stats for both the pages and the rowgroups. TODO: make them separate. rmm::device_uvector page_stats(num_stats_bfr, stream); - for (uint32_t b = 0, r = 0; b < (uint32_t)batch_list.size(); b++) { - uint8_t* bfr = static_cast(uncomp_bfr.data()); - uint8_t* bfr_c = static_cast(comp_bfr.data()); - for (uint32_t j = 0; j < batch_list[b]; j++, r++) { - for (int i = 0; i < num_columns; i++) { + for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + auto bfr = static_cast(uncomp_bfr.data()); + auto bfr_c = static_cast(comp_bfr.data()); + for (auto j = 0; j < batch_list[b]; j++, r++) { + for (auto i = 0; i < num_columns; i++) { gpu::EncColumnChunk* ck = &chunks[r][i]; ck->uncompressed_bfr = bfr; ck->compressed_bfr = bfr_c; @@ -1360,14 +1362,15 @@ void writer::impl::write(table_view const& table) pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches - for (uint32_t b = 0, r = 0, global_r = global_rowgroup_base; b < (uint32_t)batch_list.size(); + for (auto b = 0, r = 0, global_r = global_rowgroup_base; + b < static_cast(batch_list.size()); b++) { // Count pages in this batch - uint32_t rnext = r + batch_list[b]; - uint32_t first_page_in_batch = chunks[r][0].first_page; - uint32_t first_page_in_next_batch = + auto const rnext = r + batch_list[b]; + auto const first_page_in_batch = chunks[r][0].first_page; + auto const first_page_in_next_batch = (rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages; - uint32_t pages_in_batch = first_page_in_next_batch - first_page_in_batch; + auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch; // device_span batch_pages{pages.data() + first_page_in_batch, } encode_pages( chunks, @@ -1514,7 +1517,7 @@ std::unique_ptr> writer::close(std::string const& column_ch return _impl->close(column_chunks_file_path); } -std::unique_ptr> writer::merge_rowgroup_metadata( +std::unique_ptr> writer::merge_row_group_metadata( const std::vector>>& metadata_list) { std::vector output; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c7cdf8effd1..9188218f607 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -56,13 +56,6 @@ using cudf::detail::hostdevice_2dvector; * @brief Implementation for parquet writer */ class writer::impl { - // Parquet datasets are divided into fixed-size, independent rowgroups - static constexpr uint32_t DEFAULT_ROWGROUP_MAXSIZE = 128 * 1024 * 1024; // 128MB - static constexpr uint32_t DEFAULT_ROWGROUP_MAXROWS = 1000000; // Or at most 1M rows - - // rowgroups are divided into pages - static constexpr uint32_t DEFAULT_TARGET_PAGE_SIZE = 512 * 1024; - public: /** * @brief Constructor with writer options. @@ -209,9 +202,8 @@ class writer::impl { // Cuda stream to be used rmm::cuda_stream_view stream = rmm::cuda_stream_default; - size_t max_rowgroup_size_ = DEFAULT_ROWGROUP_MAXSIZE; - size_t max_rowgroup_rows_ = DEFAULT_ROWGROUP_MAXROWS; - size_t target_page_size_ = DEFAULT_TARGET_PAGE_SIZE; + size_t max_row_group_size = default_row_group_size_bytes; + size_type max_row_group_rows = default_row_group_size_rows; Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 3bae8d7ab1e..b233819092a 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3056,4 +3056,26 @@ TEST_F(ParquetReaderTest, EmptyOutput) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(ParquetWriterTest, RowGroupSizeInvalid) +{ + const auto unused_table = std::make_unique(); + std::vector out_buffer; + + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_rows(4999), + cudf::logic_error); + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_bytes(511 << 10), + cudf::logic_error); + + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_rows(4999), + cudf::logic_error); + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_bytes(511 << 10), + cudf::logic_error); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/cpp/io/orc.pxd b/python/cudf/cudf/_lib/cpp/io/orc.pxd index f0450483345..4b5ec913fb6 100644 --- a/python/cudf/cudf/_lib/cpp/io/orc.pxd +++ b/python/cudf/cudf/_lib/cpp/io/orc.pxd @@ -72,10 +72,10 @@ cdef extern from "cudf/io/orc.hpp" \ orc_writer_options() cudf_io_types.sink_info get_sink() except+ cudf_io_types.compression_type get_compression() except+ - bool enable_statistics() except+ - size_t stripe_size_bytes() except+ - size_type stripe_size_rows() except+ - size_type row_index_stride() except+ + bool is_enabled_statistics() except+ + size_t get_stripe_size_bytes() except+ + size_type get_stripe_size_rows() except+ + size_type get_row_index_stride() except+ cudf_table_view.table_view get_table() except+ const cudf_io_types.table_input_metadata *get_metadata() except+ diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 81ca7e5836b..9d95dce83bc 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -74,6 +74,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_table_view.table_view get_table() except + const cudf_io_types.table_input_metadata get_metadata() except + string get_column_chunks_file_path() except+ + size_t get_row_group_size_bytes() except+ + size_type get_row_group_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m @@ -87,6 +89,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_column_chunks_file_path( string column_chunks_file_path ) except + + void set_row_group_size_bytes(size_t val) except+ + void set_row_group_size_rows(size_type val) except+ @staticmethod parquet_writer_options_builder builder( @@ -116,6 +120,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& int96_timestamps( bool enabled ) except + + parquet_writer_options_builder& row_group_size_bytes( + size_t val + ) except+ + parquet_writer_options_builder& row_group_size_rows( + size_type val + ) except+ parquet_writer_options build() except + @@ -130,6 +140,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cudf_io_types.statistics_freq get_stats_level() except + cudf_io_types.table_input_metadata* get_metadata( ) except+ + size_t get_row_group_size_bytes() except+ + size_type get_row_group_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m @@ -140,6 +152,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + + void set_row_group_size_bytes(size_t val) except+ + void set_row_group_size_rows(size_type val) except+ @staticmethod chunked_parquet_writer_options_builder builder( @@ -160,6 +174,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& compression( cudf_io_types.compression_type compression ) except + + parquet_writer_options_builder& row_group_size_bytes( + size_t val + ) except+ + parquet_writer_options_builder& row_group_size_rows( + size_type val + ) except+ chunked_parquet_writer_options build() except + @@ -173,6 +193,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: string column_chunks_file_path, ) except+ - cdef unique_ptr[vector[uint8_t]] merge_rowgroup_metadata( + cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( const vector[unique_ptr[vector[uint8_t]]]& metadata_list ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 71705f4d0c1..d17184685fa 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -46,7 +46,7 @@ from cudf._lib.column cimport Column from cudf._lib.cpp.io.parquet cimport ( chunked_parquet_writer_options, chunked_parquet_writer_options_builder, - merge_rowgroup_metadata as parquet_merge_metadata, + merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, parquet_reader_options, parquet_writer_options, @@ -282,7 +282,9 @@ cpdef write_parquet( object compression="snappy", object statistics="ROWGROUP", object metadata_file_path=None, - object int96_timestamps=False): + object int96_timestamps=False, + object row_group_size_bytes=None, + object row_group_size_rows=None): """ Cython function to call into libcudf API, see `write_parquet`. @@ -334,7 +336,6 @@ cpdef write_parquet( cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics) - cdef parquet_writer_options args cdef unique_ptr[vector[uint8_t]] out_metadata_c cdef string c_column_chunks_file_path cdef bool _int96_timestamps = int96_timestamps @@ -342,16 +343,21 @@ cpdef write_parquet( c_column_chunks_file_path = str.encode(metadata_file_path) # Perform write + cdef parquet_writer_options args = move( + parquet_writer_options.builder(sink, tv) + .metadata(tbl_meta.get()) + .compression(comp_type) + .stats_level(stat_freq) + .column_chunks_file_path(c_column_chunks_file_path) + .int96_timestamps(_int96_timestamps) + .build() + ) + if row_group_size_bytes is not None: + args.set_row_group_size_bytes(row_group_size_bytes) + if row_group_size_rows is not None: + args.set_row_group_size_rows(row_group_size_rows) + with nogil: - args = move( - parquet_writer_options.builder(sink, tv) - .metadata(tbl_meta.get()) - .compression(comp_type) - .stats_level(stat_freq) - .column_chunks_file_path(c_column_chunks_file_path) - .int96_timestamps(_int96_timestamps) - .build() - ) out_metadata_c = move(parquet_writer(args)) if metadata_file_path is not None: @@ -483,11 +489,11 @@ cdef class ParquetWriter: cpdef merge_filemetadata(object filemetadata_list): """ - Cython function to call into libcudf API, see `merge_rowgroup_metadata`. + Cython function to call into libcudf API, see `merge_row_group_metadata`. See Also -------- - cudf.io.parquet.merge_rowgroup_metadata + cudf.io.parquet.merge_row_group_metadata """ cdef vector[unique_ptr[vector[uint8_t]]] list_c cdef vector[uint8_t] blob_c diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 302021a082f..9d665d9a0a5 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -441,6 +441,8 @@ def to_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, + row_group_size_bytes=None, + row_group_size_rows=None, *args, **kwargs, ): @@ -480,6 +482,8 @@ def to_parquet( statistics=statistics, metadata_file_path=metadata_file_path, int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, ) else: write_parquet_res = libparquet.write_parquet( @@ -490,6 +494,8 @@ def to_parquet( statistics=statistics, metadata_file_path=metadata_file_path, int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, ) return write_parquet_res diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index df31738050b..b6595be9566 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -231,6 +231,11 @@ def _make_parquet_path_or_buf(src): yield _make_parquet_path_or_buf +@pytest.fixture(scope="module") +def large_int64_gdf(): + return cudf.DataFrame.from_pandas(pd.DataFrame({"col": range(0, 1 << 20)})) + + @pytest.mark.filterwarnings("ignore:Using CPU") @pytest.mark.parametrize("engine", ["pyarrow", "cudf"]) @pytest.mark.parametrize( @@ -2170,3 +2175,21 @@ def test_parquet_reader_brotli(datadir): got = cudf.read_parquet(fname).to_pandas(nullable=True) assert_eq(expect, got) + + +@pytest.mark.parametrize("size_bytes", [4_000_000, 1_000_000, 600_000]) +@pytest.mark.parametrize("size_rows", [1_000_000, 100_000, 10_000]) +def test_parquet_writer_row_group_size( + tmpdir, large_int64_gdf, size_bytes, size_rows +): + fname = tmpdir.join("row_group_size.parquet") + large_int64_gdf.to_parquet( + fname, row_group_size_bytes=size_bytes, row_group_size_rows=size_rows + ) + + num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + # 8 bytes per row, as the column is int64 + expected_num_rows = max( + math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes) + ) + assert expected_num_rows == row_groups diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 6746753249c..11994830fed 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -221,6 +221,12 @@ timestamp[us] to the int96 format, which is the number of Julian days and the number of nanoseconds since midnight. If ``False``, timestamps will not be altered. +row_group_size_bytes: integer or None, default None + Maximum size of each stripe of the output. + If None, 13369344 (128MB) will be used. +row_group_size_rows: integer or None, default None + Maximum number of rows of each stripe of the output. + If None, 1000000 will be used. See Also @@ -404,10 +410,10 @@ stripe_size_bytes: integer or None, default None Maximum size of each stripe of the output. If None, 67108864 (64MB) will be used. -stripe_size_rows: integer or None, default None 1000000 +stripe_size_rows: integer or None, default None Maximum number of rows of each stripe of the output. If None, 1000000 will be used. -row_index_stride: integer or None, default None 10000 +row_index_stride: integer or None, default None Row index stride (maximum number of rows in each row group). If None, 10000 will be used.