Skip to content

Commit

Permalink
Use managed memory for NDSH benchmarks (#17039)
Browse files Browse the repository at this point in the history
Fixes #16987
Use managed memory to generate the parquet data, and write parquet data to host buffer.
Replace use of parquet_device_buffer with cuio_source_sink_pair

Authors:
  - Karthikeyan (https://github.com/karthikeyann)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Tianyu Liu (https://github.com/kingcrimsontianyu)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

URL: #17039
  • Loading branch information
karthikeyann authored Oct 23, 2024
1 parent deb9af4 commit e7653a7
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ target_compile_options(

target_link_libraries(
ndsh_data_generator
PUBLIC cudf GTest::gmock GTest::gtest cudf::cudftestutil nvtx3::nvtx3-cpp
PUBLIC cudf cudf::cudftestutil nvtx3::nvtx3-cpp
PRIVATE $<TARGET_NAME_IF_EXISTS:conda_env>
)

Expand Down
8 changes: 4 additions & 4 deletions cpp/benchmarks/ndsh/q01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
}

void run_ndsh_q1(nvbench::state& state,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
// Define the column projections and filter predicate for `lineitem` table
std::vector<std::string> const lineitem_cols = {"l_returnflag",
Expand All @@ -124,8 +124,8 @@ void run_ndsh_q1(nvbench::state& state,
cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal);

// Read out the `lineitem` table from parquet file
auto lineitem =
read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred));
auto lineitem = read_parquet(
sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred));

// Calculate the discount price and charge columns and append to lineitem table
auto disc_price =
Expand Down Expand Up @@ -170,7 +170,7 @@ void ndsh_q1(nvbench::state& state)
{
// Generate the required parquet files in device buffers
double const scale_factor = state.get_float64("scale_factor");
std::unordered_map<std::string, parquet_device_buffer> sources;
std::unordered_map<std::string, cuio_source_sink_pair> sources;
generate_parquet_data_sources(scale_factor, {"lineitem"}, sources);

auto stream = cudf::get_default_stream();
Expand Down
16 changes: 8 additions & 8 deletions cpp/benchmarks/ndsh/q05.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
}

void run_ndsh_q5(nvbench::state& state,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
// Define the column projection and filter predicate for the `orders` table
std::vector<std::string> const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"};
Expand Down Expand Up @@ -120,17 +120,17 @@ void run_ndsh_q5(nvbench::state& state,
// Read out the tables from parquet files
// while pushing down the column projections and filter predicates
auto const customer =
read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"});
read_parquet(sources.at("customer").make_source_info(), {"c_custkey", "c_nationkey"});
auto const orders =
read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred));
auto const lineitem = read_parquet(sources["lineitem"].make_source_info(),
read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred));
auto const lineitem = read_parquet(sources.at("lineitem").make_source_info(),
{"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"});
auto const supplier =
read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"});
read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"});
auto const nation =
read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"});
read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_regionkey", "n_name"});
auto const region =
read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred));
read_parquet(sources.at("region").make_source_info(), region_cols, std::move(region_pred));

// Perform the joins
auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"});
Expand Down Expand Up @@ -165,7 +165,7 @@ void ndsh_q5(nvbench::state& state)
{
// Generate the required parquet files in device buffers
double const scale_factor = state.get_float64("scale_factor");
std::unordered_map<std::string, parquet_device_buffer> sources;
std::unordered_map<std::string, cuio_source_sink_pair> sources;
generate_parquet_data_sources(
scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources);

Expand Down
8 changes: 4 additions & 4 deletions cpp/benchmarks/ndsh/q06.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
}

void run_ndsh_q6(nvbench::state& state,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
// Read out the `lineitem` table from parquet file
std::vector<std::string> const lineitem_cols = {
Expand All @@ -83,8 +83,8 @@ void run_ndsh_q6(nvbench::state& state,
cudf::ast::operation(cudf::ast::ast_operator::LESS, shipdate_ref, shipdate_upper_literal);
auto const lineitem_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b);
auto lineitem =
read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred));
auto lineitem = read_parquet(
sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred));

// Cast the discount and quantity columns to float32 and append to lineitem table
auto discout_float =
Expand Down Expand Up @@ -134,7 +134,7 @@ void ndsh_q6(nvbench::state& state)
{
// Generate the required parquet files in device buffers
double const scale_factor = state.get_float64("scale_factor");
std::unordered_map<std::string, parquet_device_buffer> sources;
std::unordered_map<std::string, cuio_source_sink_pair> sources;
generate_parquet_data_sources(scale_factor, {"lineitem"}, sources);

auto stream = cudf::get_default_stream();
Expand Down
17 changes: 9 additions & 8 deletions cpp/benchmarks/ndsh/q09.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,21 @@
}

void run_ndsh_q9(nvbench::state& state,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
// Read out the table from parquet files
auto const lineitem = read_parquet(
sources["lineitem"].make_source_info(),
sources.at("lineitem").make_source_info(),
{"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"});
auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"});
auto const nation =
read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_name"});
auto const orders =
read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"});
auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"});
auto const partsupp = read_parquet(sources["partsupp"].make_source_info(),
read_parquet(sources.at("orders").make_source_info(), {"o_orderkey", "o_orderdate"});
auto const part = read_parquet(sources.at("part").make_source_info(), {"p_partkey", "p_name"});
auto const partsupp = read_parquet(sources.at("partsupp").make_source_info(),
{"ps_suppkey", "ps_partkey", "ps_supplycost"});
auto const supplier =
read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"});
read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"});

// Generating the `profit` table
// Filter the part table using `p_name like '%green%'`
Expand Down Expand Up @@ -178,7 +179,7 @@ void ndsh_q9(nvbench::state& state)
{
// Generate the required parquet files in device buffers
double const scale_factor = state.get_float64("scale_factor");
std::unordered_map<std::string, parquet_device_buffer> sources;
std::unordered_map<std::string, cuio_source_sink_pair> sources;
generate_parquet_data_sources(
scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources);

Expand Down
13 changes: 7 additions & 6 deletions cpp/benchmarks/ndsh/q10.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
}

void run_ndsh_q10(nvbench::state& state,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
// Define the column projection and filter predicate for the `orders` table
std::vector<std::string> const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"};
Expand Down Expand Up @@ -122,15 +122,16 @@ void run_ndsh_q10(nvbench::state& state,
// Read out the tables from parquet files
// while pushing down the column projections and filter predicates
auto const customer = read_parquet(
sources["customer"].make_source_info(),
sources.at("customer").make_source_info(),
{"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"});
auto const orders =
read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred));
read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred));
auto const lineitem =
read_parquet(sources["lineitem"].make_source_info(),
read_parquet(sources.at("lineitem").make_source_info(),
{"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"},
std::move(lineitem_pred));
auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"});
auto const nation =
read_parquet(sources.at("nation").make_source_info(), {"n_name", "n_nationkey"});

// Perform the joins
auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"});
Expand Down Expand Up @@ -163,7 +164,7 @@ void ndsh_q10(nvbench::state& state)
{
// Generate the required parquet files in device buffers
double const scale_factor = state.get_float64("scale_factor");
std::unordered_map<std::string, parquet_device_buffer> sources;
std::unordered_map<std::string, cuio_source_sink_pair> sources;
generate_parquet_data_sources(
scale_factor, {"customer", "orders", "lineitem", "nation"}, sources);

Expand Down
157 changes: 122 additions & 35 deletions cpp/benchmarks/ndsh/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "utilities.hpp"

#include "common/ndsh_data_generator/ndsh_data_generator.hpp"
#include "common/table_utilities.hpp"
#include "cudf/detail/utilities/integer_utils.hpp"

#include <cudf/column/column_factories.hpp>
#include <cudf/copying.hpp>
Expand All @@ -30,8 +32,15 @@
#include <cudf/transform.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/mr/device/managed_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <algorithm>
#include <cstdlib>
#include <ctime>
#include <iterator>
#include <unordered_set>

namespace {

Expand Down Expand Up @@ -85,6 +94,15 @@ std::vector<std::string> const NATION_SCHEMA = {
"n_nationkey", "n_name", "n_regionkey", "n_comment"};
std::vector<std::string> const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"};

std::unordered_map<std::string, std::vector<std::string> const> const SCHEMAS = {
{"orders", ORDERS_SCHEMA},
{"lineitem", LINEITEM_SCHEMA},
{"part", PART_SCHEMA},
{"partsupp", PARTSUPP_SCHEMA},
{"supplier", SUPPLIER_SCHEMA},
{"customer", CUSTOMER_SCHEMA},
{"nation", NATION_SCHEMA},
{"region", REGION_SCHEMA}};
} // namespace

cudf::table_view table_with_names::table() const { return tbl->view(); }
Expand Down Expand Up @@ -337,7 +355,7 @@ int32_t days_since_epoch(int year, int month, int day)

void write_to_parquet_device_buffer(std::unique_ptr<cudf::table> const& table,
std::vector<std::string> const& col_names,
parquet_device_buffer& source)
cuio_source_sink_pair& source)
{
CUDF_FUNC_RANGE();
auto const stream = cudf::get_default_stream();
Expand All @@ -351,55 +369,124 @@ void write_to_parquet_device_buffer(std::unique_ptr<cudf::table> const& table,
metadata.schema_info = col_name_infos;
auto const table_input_metadata = cudf::io::table_input_metadata{metadata};

// Declare a host and device buffer
std::vector<char> h_buffer;

auto est_size = static_cast<std::size_t>(estimate_size(table->view()));
constexpr auto PQ_MAX_TABLE_BYTES = 8ul << 30; // 8GB
// TODO: best to get this limit from percent_of_free_device_memory(50) of device memory resource.
if (est_size > PQ_MAX_TABLE_BYTES) {
auto builder = cudf::io::chunked_parquet_writer_options::builder(source.make_sink_info());
builder.metadata(table_input_metadata);
auto const options = builder.build();
auto num_splits = static_cast<cudf::size_type>(
std::ceil(static_cast<long double>(est_size) / (PQ_MAX_TABLE_BYTES)));
std::vector<cudf::size_type> splits(num_splits - 1);
auto num_rows = table->num_rows();
auto num_row_per_chunk = cudf::util::div_rounding_up_safe(num_rows, num_splits);
std::generate_n(splits.begin(), splits.size(), [num_row_per_chunk, i = 0]() mutable {
return (i += num_row_per_chunk);
});
std::vector<cudf::table_view> split_tables = cudf::split(table->view(), splits, stream);
auto writer = cudf::io::parquet_chunked_writer(options, stream);
for (auto const& chunk_table : split_tables) {
writer.write(chunk_table);
}
writer.close();
return;
}
// Write parquet data to host buffer
auto builder =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view());
auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view());
builder.metadata(table_input_metadata);
auto const options = builder.build();
cudf::io::write_parquet(options);
cudf::io::write_parquet(options, stream);
}

// Copy host buffer to device buffer
source.d_buffer.resize(h_buffer.size(), stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(
source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value()));
inline auto make_managed_pool()
{
return rmm::mr::make_owning_wrapper<rmm::mr::pool_memory_resource>(
std::make_shared<rmm::mr::managed_memory_resource>(), rmm::percent_of_free_device_memory(50));
}

void generate_parquet_data_sources(double scale_factor,
std::vector<std::string> const& table_names,
std::unordered_map<std::string, parquet_device_buffer>& sources)
std::unordered_map<std::string, cuio_source_sink_pair>& sources)
{
CUDF_FUNC_RANGE();
std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) {
sources[table_name] = parquet_device_buffer();
});

auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
// Set the memory resource to the managed pool
auto old_mr = cudf::get_current_device_resource();
// if already managed pool or managed, don't create new one.
using managed_pool_mr_t = decltype(make_managed_pool());
managed_pool_mr_t managed_pool_mr;
bool const is_managed =
dynamic_cast<rmm::mr::pool_memory_resource<rmm::mr::managed_memory_resource>*>(old_mr) or
dynamic_cast<rmm::mr::managed_memory_resource*>(old_mr);
if (!is_managed) {
std::cout << "Creating managed pool just for data generation\n";
managed_pool_mr = make_managed_pool();
cudf::set_current_device_resource(managed_pool_mr.get());
// drawback: if already pool takes 50% of free memory, we are left with 50% of 50% of free
// memory.
}

auto partsupp = cudf::datagen::generate_partsupp(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
std::unordered_set<std::string> const requested_table_names = [&table_names]() {
if (table_names.empty()) {
return std::unordered_set<std::string>{
"orders", "lineitem", "part", "partsupp", "supplier", "customer", "nation", "region"};
}
return std::unordered_set(table_names.begin(), table_names.end());
}();
std::for_each(
requested_table_names.begin(), requested_table_names.end(), [&](auto const& table_name) {
sources.emplace(table_name, cuio_source_sink_pair(io_type::HOST_BUFFER));
});
std::unordered_map<std::string, std::unique_ptr<cudf::table>> tables;

if (sources.count("orders") or sources.count("lineitem") or sources.count("part")) {
auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
if (sources.count("orders")) {
write_to_parquet_device_buffer(orders, SCHEMAS.at("orders"), sources.at("orders"));
orders = {};
}
if (sources.count("part")) {
write_to_parquet_device_buffer(part, SCHEMAS.at("part"), sources.at("part"));
part = {};
}
if (sources.count("lineitem")) {
write_to_parquet_device_buffer(lineitem, SCHEMAS.at("lineitem"), sources.at("lineitem"));
lineitem = {};
}
}

if (sources.count("partsupp")) {
auto partsupp = cudf::datagen::generate_partsupp(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
write_to_parquet_device_buffer(partsupp, SCHEMAS.at("partsupp"), sources.at("partsupp"));
}

auto supplier = cudf::datagen::generate_supplier(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
if (sources.count("supplier")) {
auto supplier = cudf::datagen::generate_supplier(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
write_to_parquet_device_buffer(supplier, SCHEMAS.at("supplier"), sources.at("supplier"));
}

auto customer = cudf::datagen::generate_customer(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
if (sources.count("customer")) {
auto customer = cudf::datagen::generate_customer(
scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref());
write_to_parquet_device_buffer(customer, SCHEMAS.at("customer"), sources.at("customer"));
}

auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(),
cudf::get_current_device_resource_ref());
if (sources.count("nation")) {
auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(),
cudf::get_current_device_resource_ref());
write_to_parquet_device_buffer(nation, SCHEMAS.at("nation"), sources.at("nation"));
}

auto region = cudf::datagen::generate_region(cudf::get_default_stream(),
cudf::get_current_device_resource_ref());
if (sources.count("region")) {
auto region = cudf::datagen::generate_region(cudf::get_default_stream(),
cudf::get_current_device_resource_ref());
write_to_parquet_device_buffer(region, SCHEMAS.at("region"), sources.at("region"));
}

write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]);
write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]);
write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]);
write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]);
write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]);
write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]);
write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]);
write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]);
// Restore the original memory resource
if (!is_managed) { cudf::set_current_device_resource(old_mr); }
}
Loading

0 comments on commit e7653a7

Please sign in to comment.