Skip to content

Commit

Permalink
chunked parquet writing for reducing memory writing lineitem table
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikeyann committed Oct 19, 2024
1 parent c034731 commit d3db1ae
Showing 1 changed file with 28 additions and 3 deletions.
31 changes: 28 additions & 3 deletions cpp/benchmarks/ndsh/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "utilities.hpp"

#include "common/ndsh_data_generator/ndsh_data_generator.hpp"
#include "common/table_utilities.hpp"
#include "cudf/detail/utilities/integer_utils.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
Expand All @@ -34,8 +36,11 @@
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <algorithm>
#include <cstdlib>
#include <ctime>
#include <iterator>
#include <unordered_set>

namespace {

Expand Down Expand Up @@ -364,9 +369,29 @@ void write_to_parquet_device_buffer(std::unique_ptr<cudf::table> const& table,
metadata.schema_info = col_name_infos;
auto const table_input_metadata = cudf::io::table_input_metadata{metadata};

// Declare a host and device buffer
std::vector<char> h_buffer;

auto est_size = static_cast<std::size_t>(estimate_size(table->view()));
constexpr auto PQ_MAX_TABLE_BYTES = 8ul << 30; // 8GB
// TODO: best to get this limit from percent_of_free_device_memory(50) of device memory resource.
if (est_size > PQ_MAX_TABLE_BYTES) {
auto builder = cudf::io::chunked_parquet_writer_options::builder(source.make_sink_info());
builder.metadata(table_input_metadata);
auto const options = builder.build();
auto num_splits = static_cast<cudf::size_type>(
std::ceil(static_cast<long double>(est_size) / (PQ_MAX_TABLE_BYTES)));
std::vector<cudf::size_type> splits(num_splits - 1);
auto num_rows = table->num_rows();
auto num_row_per_chunk = cudf::util::div_rounding_up_safe(num_rows, num_splits);
std::generate_n(splits.begin(), splits.size(), [num_row_per_chunk, i = 0]() mutable {
return (i += num_row_per_chunk);
});
std::vector<cudf::table_view> split_tables = cudf::split(table->view(), splits, stream);
auto writer = cudf::io::parquet_chunked_writer(options, stream);
for (auto const& chunk_table : split_tables) {
writer.write(chunk_table);
}
writer.close();
return;
}
// Write parquet data to host buffer
auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view());
builder.metadata(table_input_metadata);
Expand Down

0 comments on commit d3db1ae

Please sign in to comment.