From d3db1aeb34fa191f67c4d4f6d195cef8fa08b71d Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Sat, 19 Oct 2024 15:57:59 +0000 Subject: [PATCH] chunked parquet writing for reducing memory writing lineitem table --- cpp/benchmarks/ndsh/utilities.cpp | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index c19c1361ea2..9f9849860c9 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -17,6 +17,8 @@ #include "utilities.hpp" #include "common/ndsh_data_generator/ndsh_data_generator.hpp" +#include "common/table_utilities.hpp" +#include "cudf/detail/utilities/integer_utils.hpp" #include #include @@ -34,8 +36,11 @@ #include #include +#include #include #include +#include +#include namespace { @@ -364,9 +369,29 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, metadata.schema_info = col_name_infos; auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - // Declare a host and device buffer - std::vector h_buffer; - + auto est_size = static_cast(estimate_size(table->view())); + constexpr auto PQ_MAX_TABLE_BYTES = 8ul << 30; // 8GB + // TODO: best to get this limit from percent_of_free_device_memory(50) of device memory resource. + if (est_size > PQ_MAX_TABLE_BYTES) { + auto builder = cudf::io::chunked_parquet_writer_options::builder(source.make_sink_info()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + auto num_splits = static_cast( + std::ceil(static_cast(est_size) / (PQ_MAX_TABLE_BYTES))); + std::vector splits(num_splits - 1); + auto num_rows = table->num_rows(); + auto num_row_per_chunk = cudf::util::div_rounding_up_safe(num_rows, num_splits); + std::generate_n(splits.begin(), splits.size(), [num_row_per_chunk, i = 0]() mutable { + return (i += num_row_per_chunk); + }); + std::vector split_tables = cudf::split(table->view(), splits, stream); + auto writer = cudf::io::parquet_chunked_writer(options, stream); + for (auto const& chunk_table : split_tables) { + writer.write(chunk_table); + } + writer.close(); + return; + } // Write parquet data to host buffer auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); builder.metadata(table_input_metadata);