diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index d44f15f99f7..d6812559e38 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -39,6 +39,8 @@ namespace io { constexpr size_t default_row_group_size_bytes = 128 * 1024 * 1024; // 128MB constexpr size_type default_row_group_size_rows = 1000000; +constexpr size_t default_max_page_size_bytes = 512 * 1024; +constexpr size_type default_max_page_size_rows = 20000; /** * @brief Builds parquet_reader_options to use for `read_parquet()`. @@ -382,6 +384,10 @@ class parquet_writer_options { size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) size_type _row_group_size_rows = default_row_group_size_rows; + // Maximum size of each page (uncompressed) + size_t _max_page_size_bytes = default_max_page_size_bytes; + // Maximum number of rows in a page + size_type _max_page_size_rows = default_max_page_size_rows; /** * @brief Constructor from sink and table. @@ -482,6 +488,24 @@ class parquet_writer_options { */ auto get_row_group_size_rows() const { return _row_group_size_rows; } + /** + * @brief Returns the maximum uncompressed page size, in bytes. If set larger than the row group + * size, then this will return the row group size. + */ + auto get_max_page_size_bytes() const + { + return std::min(_max_page_size_bytes, get_row_group_size_bytes()); + } + + /** + * @brief Returns maximum page size, in rows. If set larger than the row group size, then this + * will return the row group size. + */ + auto get_max_page_size_rows() const + { + return std::min(_max_page_size_rows, get_row_group_size_rows()); + } + /** * @brief Sets partitions. * @@ -555,8 +579,8 @@ class parquet_writer_options { void set_row_group_size_bytes(size_t size_bytes) { CUDF_EXPECTS( - size_bytes >= 512 * 1024, - "The maximum row group size cannot be smaller than the page size, which is 512KB."); + size_bytes >= 4 * 1024, + "The maximum row group size cannot be smaller than the minimum page size, which is 4KB."); _row_group_size_bytes = size_bytes; } @@ -567,9 +591,29 @@ class parquet_writer_options { { CUDF_EXPECTS( size_rows >= 5000, - "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); + "The maximum row group size cannot be smaller than the fragment size, which is 5000 rows."); _row_group_size_rows = size_rows; } + + /** + * @brief Sets the maximum uncompressed page size, in bytes. + */ + void set_max_page_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB."); + _max_page_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum page size, in rows. + */ + void set_max_page_size_rows(size_type size_rows) + { + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum page size cannot be smaller than the fragment size, which is 5000 rows."); + _max_page_size_rows = size_rows; + } }; class parquet_writer_options_builder { @@ -690,7 +734,7 @@ class parquet_writer_options_builder { /** * @brief Sets the maximum number of rows in output row groups. * - * @param val maximum number or rows + * @param val maximum number of rows * @return this for chaining. */ parquet_writer_options_builder& row_group_size_rows(size_type val) @@ -699,6 +743,33 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer, + * and can be exceeded under certain circumstances. Cannot be larger than the row group size in + * bytes, and will be adjusted to match if it is. + * + * @param val maximum page size + * @return this for chaining. + */ + parquet_writer_options_builder& max_page_size_bytes(size_t val) + { + options.set_max_page_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. + * Cannot be larger than the row group size in rows, and will be adjusted to match if it is. + * + * @param val maximum rows per page + * @return this for chaining. + */ + parquet_writer_options_builder& max_page_size_rows(size_type val) + { + options.set_max_page_size_rows(val); + return *this; + } + /** * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. * @@ -783,6 +854,10 @@ class chunked_parquet_writer_options { size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) size_type _row_group_size_rows = default_row_group_size_rows; + // Maximum size of each page (uncompressed) + size_t _max_page_size_bytes = default_max_page_size_bytes; + // Maximum number of rows in a page + size_type _max_page_size_rows = default_max_page_size_rows; /** * @brief Constructor from sink. @@ -844,6 +919,24 @@ class chunked_parquet_writer_options { */ auto get_row_group_size_rows() const { return _row_group_size_rows; } + /** + * @brief Returns maximum uncompressed page size, in bytes. If set larger than the row group size, + * then this will return the row group size. + */ + auto get_max_page_size_bytes() const + { + return std::min(_max_page_size_bytes, get_row_group_size_bytes()); + } + + /** + * @brief Returns maximum page size, in rows. If set larger than the row group size, then this + * will return the row group size. + */ + auto get_max_page_size_rows() const + { + return std::min(_max_page_size_rows, get_row_group_size_rows()); + } + /** * @brief Sets metadata. * @@ -891,8 +984,8 @@ class chunked_parquet_writer_options { void set_row_group_size_bytes(size_t size_bytes) { CUDF_EXPECTS( - size_bytes >= 512 * 1024, - "The maximum row group size cannot be smaller than the page size, which is 512KB."); + size_bytes >= 4 * 1024, + "The maximum row group size cannot be smaller than the minimum page size, which is 4KB."); _row_group_size_bytes = size_bytes; } @@ -903,10 +996,30 @@ class chunked_parquet_writer_options { { CUDF_EXPECTS( size_rows >= 5000, - "The maximum row group size cannot be smaller than the page size, which is 5000 rows."); + "The maximum row group size cannot be smaller than the fragment size, which is 5000 rows."); _row_group_size_rows = size_rows; } + /** + * @brief Sets the maximum uncompressed page size, in bytes. + */ + void set_max_page_size_bytes(size_t size_bytes) + { + CUDF_EXPECTS(size_bytes >= 4 * 1024, "The maximum page size cannot be smaller than 4KB."); + _max_page_size_bytes = size_bytes; + } + + /** + * @brief Sets the maximum page size, in rows. + */ + void set_max_page_size_rows(size_type size_rows) + { + CUDF_EXPECTS( + size_rows >= 5000, + "The maximum page size cannot be smaller than the fragment size, which is 5000 rows."); + _max_page_size_rows = size_rows; + } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -1016,7 +1129,7 @@ class chunked_parquet_writer_options_builder { /** * @brief Sets the maximum number of rows in output row groups. * - * @param val maximum number or rows + * @param val maximum number of rows * @return this for chaining. */ chunked_parquet_writer_options_builder& row_group_size_rows(size_type val) @@ -1025,6 +1138,33 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Sets the maximum uncompressed page size, in bytes. Serves as a hint to the writer, + * and can be exceeded under certain circumstances. Cannot be larger than the row group size in + * bytes, and will be adjusted to match if it is. + * + * @param val maximum page size + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val) + { + options.set_max_page_size_bytes(val); + return *this; + } + + /** + * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. + * Cannot be larger than the row group size in rows, and will be adjusted to match if it is. + * + * @param val maximum rows per page + * @return this for chaining. + */ + chunked_parquet_writer_options_builder& max_page_size_rows(size_type val) + { + options.set_max_page_size_rows(val); + return *this; + } + /** * @brief move chunked_parquet_writer_options member once it's built. */ diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index f05f0af2a79..518eac6f90d 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -240,7 +240,9 @@ __global__ void __launch_bounds__(128) statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, size_t max_page_comp_data_size, - int32_t num_columns) + int32_t num_columns, + size_t max_page_size_bytes, + size_type max_page_size_rows) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach __shared__ __align__(8) parquet_column_device_view col_g; @@ -334,11 +336,16 @@ __global__ void __launch_bounds__(128) ? frag_g.num_leaf_values * 2 // Assume worst-case of 2-bytes per dictionary index : frag_g.fragment_data_size; // TODO (dm): this convoluted logic to limit page size needs refactoring - uint32_t max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 - : (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024 - : 512 * 1024; + size_t this_max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 + : (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024 + : 512 * 1024; + + // override this_max_page_size if the requested size is smaller + this_max_page_size = min(this_max_page_size, max_page_size_bytes); + if (num_rows >= ck_g.num_rows || - (values_in_page > 0 && (page_size + fragment_data_size > max_page_size))) { + (values_in_page > 0 && (page_size + fragment_data_size > this_max_page_size)) || + rows_in_page > max_page_size_rows) { if (ck_g.use_dictionary) { page_size = 1 + 5 + ((values_in_page * ck_g.dict_rle_bits + 7) >> 3) + (values_in_page >> 8); @@ -1927,6 +1934,8 @@ void InitEncoderPages(device_2dspan chunks, device_span pages, device_span col_desc, int32_t num_columns, + size_t max_page_size_bytes, + size_type max_page_size_rows, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, size_t max_page_comp_data_size, @@ -1934,8 +1943,15 @@ void InitEncoderPages(device_2dspan chunks, { auto num_rowgroups = chunks.size().first; dim3 dim_grid(num_columns, num_rowgroups); // 1 threadblock per rowgroup - gpuInitPages<<>>( - chunks, pages, col_desc, page_grstats, chunk_grstats, max_page_comp_data_size, num_columns); + gpuInitPages<<>>(chunks, + pages, + col_desc, + page_grstats, + chunk_grstats, + max_page_comp_data_size, + num_columns, + max_page_size_bytes, + max_page_size_rows); } void EncodePages(device_span pages, diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 057b9a87214..1c8e30c51ec 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -575,6 +575,8 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, device_span pages, device_span col_desc, int32_t num_columns, + size_t max_page_size_bytes, + size_type max_page_size_rows, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, size_t max_page_comp_data_size, diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index dbbd39fb508..062c9378d1d 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -859,7 +859,16 @@ void writer::impl::init_page_sizes(hostdevice_2dvector& chu uint32_t num_columns) { chunks.host_to_device(stream); - gpu::InitEncoderPages(chunks, {}, col_desc, num_columns, nullptr, nullptr, 0, stream); + gpu::InitEncoderPages(chunks, + {}, + col_desc, + num_columns, + max_page_size_bytes, + max_page_size_rows, + nullptr, + nullptr, + 0, + stream); chunks.device_to_host(stream, true); } @@ -965,6 +974,8 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& pages, col_desc, num_columns, + max_page_size_bytes, + max_page_size_rows, (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, max_page_comp_data_size, @@ -1122,6 +1133,8 @@ writer::impl::impl(std::vector> sinks, stream(stream), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, + max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_rows(options.get_max_page_size_rows()), compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), @@ -1144,6 +1157,8 @@ writer::impl::impl(std::vector> sinks, stream(stream), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, + max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_rows(options.get_max_page_size_rows()), compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 405ab0c2880..a8be43eb1ed 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -210,6 +210,8 @@ class writer::impl { size_t max_row_group_size = default_row_group_size_bytes; size_type max_row_group_rows = default_row_group_size_rows; + size_t max_page_size_bytes = default_max_page_size_bytes; + size_type max_page_size_rows = default_max_page_size_rows; Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 3905df2b274..820d8036455 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -3195,15 +3195,59 @@ TEST_F(ParquetWriterTest, RowGroupSizeInvalid) cudf::logic_error); EXPECT_THROW( cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) - .row_group_size_bytes(511 << 10), + .max_page_size_rows(4999), + cudf::logic_error); + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_bytes(3 << 10), + cudf::logic_error); + EXPECT_THROW( + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .max_page_size_bytes(3 << 10), cudf::logic_error); EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) .row_group_size_rows(4999), cudf::logic_error); EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) - .row_group_size_bytes(511 << 10), + .max_page_size_rows(4999), cudf::logic_error); + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_bytes(3 << 10), + cudf::logic_error); + EXPECT_THROW(cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .max_page_size_bytes(3 << 10), + cudf::logic_error); +} + +TEST_F(ParquetWriterTest, RowGroupPageSizeMatch) +{ + const auto unused_table = std::make_unique(); + std::vector out_buffer; + + auto options = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info(&out_buffer), unused_table->view()) + .row_group_size_bytes(128 * 1024) + .max_page_size_bytes(512 * 1024) + .row_group_size_rows(10000) + .max_page_size_rows(20000) + .build(); + EXPECT_EQ(options.get_row_group_size_bytes(), options.get_max_page_size_bytes()); + EXPECT_EQ(options.get_row_group_size_rows(), options.get_max_page_size_rows()); +} + +TEST_F(ParquetChunkedWriterTest, RowGroupPageSizeMatch) +{ + std::vector out_buffer; + + auto options = cudf_io::chunked_parquet_writer_options::builder(cudf_io::sink_info(&out_buffer)) + .row_group_size_bytes(128 * 1024) + .max_page_size_bytes(512 * 1024) + .row_group_size_rows(10000) + .max_page_size_rows(20000) + .build(); + EXPECT_EQ(options.get_row_group_size_bytes(), options.get_max_page_size_bytes()); + EXPECT_EQ(options.get_row_group_size_rows(), options.get_max_page_size_rows()); } CUDF_TEST_PROGRAM_MAIN()