From 8f7666cc72fe7d9f53c64c30524eb8737382aa36 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 04:58:26 +0000 Subject: [PATCH 01/12] replace parquet_device_buffer with cuio_source_sink_pair --- cpp/benchmarks/ndsh/utilities.cpp | 12 +++++++----- cpp/benchmarks/ndsh/utilities.hpp | 17 +++++------------ 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index 62116ddf661..4669d8afa1b 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -337,7 +337,7 @@ int32_t days_since_epoch(int year, int month, int day) void write_to_parquet_device_buffer(std::unique_ptr const& table, std::vector const& col_names, - parquet_device_buffer& source) + cuio_source_sink_pair& source) { CUDF_FUNC_RANGE(); auto const stream = cudf::get_default_stream(); @@ -356,10 +356,12 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, // Write parquet data to host buffer auto builder = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view()); + cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); builder.metadata(table_input_metadata); auto const options = builder.build(); - cudf::io::write_parquet(options); + cudf::io::write_parquet(options, stream); + // stream.synchronize(); +} // Copy host buffer to device buffer source.d_buffer.resize(h_buffer.size(), stream); @@ -369,11 +371,11 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, - std::unordered_map& sources) + std::unordered_map& sources) { CUDF_FUNC_RANGE(); std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { - sources[table_name] = parquet_device_buffer(); + sources[table_name] = cuio_source_sink_pair(io_type::HOST_BUFFER); }); auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp index 762e43deccf..4d42b540e21 100644 --- a/cpp/benchmarks/ndsh/utilities.hpp +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "io/cuio_common.hpp" + #include #include #include @@ -195,25 +197,16 @@ std::tm make_tm(int year, int month, int day); */ int32_t days_since_epoch(int year, int month, int day); -/** - * @brief Struct representing a parquet device buffer - */ -struct parquet_device_buffer { - parquet_device_buffer() : d_buffer{0, cudf::get_default_stream()} {}; - cudf::io::source_info make_source_info() { return cudf::io::source_info(d_buffer); } - rmm::device_uvector d_buffer; -}; - /** * @brief Write a `cudf::table` to a parquet device buffer * * @param table The `cudf::table` to write * @param col_names The column names of the table - * @param parquet_device_buffer The parquet device buffer to write the table to + * @param source The source sink pair to write the table to */ void write_to_parquet_device_buffer(std::unique_ptr const& table, std::vector const& col_names, - parquet_device_buffer& source); + cuio_source_sink_pair& source); /** * @brief Generate NDS-H tables and write to parquet device buffers @@ -224,4 +217,4 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, */ void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, - std::unordered_map& sources); + std::unordered_map& sources); From b68ad604c214960fc5ced9daf779d99629be8494 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 04:59:10 +0000 Subject: [PATCH 02/12] replace parquet_device_buffer with cuio_source_sink_pair --- cpp/benchmarks/ndsh/q01.cpp | 2 +- cpp/benchmarks/ndsh/q05.cpp | 4 ++-- cpp/benchmarks/ndsh/q06.cpp | 4 ++-- cpp/benchmarks/ndsh/q09.cpp | 4 ++-- cpp/benchmarks/ndsh/q10.cpp | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/benchmarks/ndsh/q01.cpp b/cpp/benchmarks/ndsh/q01.cpp index ef709926ae9..a9bb44227b7 100644 --- a/cpp/benchmarks/ndsh/q01.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -170,7 +170,7 @@ void ndsh_q1(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); auto stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/ndsh/q05.cpp b/cpp/benchmarks/ndsh/q05.cpp index 522bc4789c2..e41f3b74728 100644 --- a/cpp/benchmarks/ndsh/q05.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -89,7 +89,7 @@ } void run_ndsh_q5(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; @@ -165,7 +165,7 @@ void ndsh_q5(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources); diff --git a/cpp/benchmarks/ndsh/q06.cpp b/cpp/benchmarks/ndsh/q06.cpp index 04078547973..fe9e38d5729 100644 --- a/cpp/benchmarks/ndsh/q06.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -64,7 +64,7 @@ } void run_ndsh_q6(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Read out the `lineitem` table from parquet file std::vector const lineitem_cols = { @@ -134,7 +134,7 @@ void ndsh_q6(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); auto stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/ndsh/q09.cpp b/cpp/benchmarks/ndsh/q09.cpp index 59218ab8912..f14393b5cfc 100644 --- a/cpp/benchmarks/ndsh/q09.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -112,7 +112,7 @@ } void run_ndsh_q9(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Read out the table from parquet files auto const lineitem = read_parquet( @@ -178,7 +178,7 @@ void ndsh_q9(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources); diff --git a/cpp/benchmarks/ndsh/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp index a520480020a..65b838bc972 100644 --- a/cpp/benchmarks/ndsh/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -94,7 +94,7 @@ } void run_ndsh_q10(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; @@ -163,7 +163,7 @@ void ndsh_q10(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"customer", "orders", "lineitem", "nation"}, sources); From 4112bcaddd83d6c6c5fef7582d7fe82c8227de47 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 05:00:10 +0000 Subject: [PATCH 03/12] use managed pool memory --- cpp/benchmarks/ndsh/utilities.cpp | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index 4669d8afa1b..cd1291a50e1 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -30,6 +30,10 @@ #include #include +#include +#include +#include + #include #include @@ -363,16 +367,22 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, // stream.synchronize(); } - // Copy host buffer to device buffer - source.d_buffer.resize(h_buffer.size(), stream); - CUDF_CUDA_TRY(cudaMemcpyAsync( - source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value())); +inline auto make_managed_pool() +{ + return rmm::mr::make_owning_wrapper( + std::make_shared(), rmm::percent_of_free_device_memory(50)); } void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, std::unordered_map& sources) { + // Set the memory resource to the managed pool + auto old_mr = cudf::get_current_device_resource(); // fixme: already pool takes 50% of free memory + // TODO: release it, and restore it later? + auto managed_pool_mr = make_managed_pool(); + cudf::set_current_device_resource(managed_pool_mr.get()); + CUDF_FUNC_RANGE(); std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { sources[table_name] = cuio_source_sink_pair(io_type::HOST_BUFFER); @@ -404,4 +414,6 @@ void generate_parquet_data_sources(double scale_factor, write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); + // Restore the original memory resource + cudf::set_current_device_resource(old_mr); } From 813d127bf6018caca84aa606bffb6dedc17acff2 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 05:00:29 +0000 Subject: [PATCH 04/12] link cudf_benchmark_common --- cpp/benchmarks/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index b0f75b25975..b10dccbe2cd 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -49,7 +49,7 @@ target_compile_options( target_link_libraries( ndsh_data_generator - PUBLIC cudf cudftestutil nvtx3::nvtx3-cpp + PUBLIC cudf cudftestutil nvtx3::nvtx3-cpp cudf_benchmark_common PRIVATE $ ) From f3ac9c2527bbbc4b625d0ab228e734893d60272f Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 05:11:12 +0000 Subject: [PATCH 05/12] fix missed replacement --- cpp/benchmarks/ndsh/q01.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/ndsh/q01.cpp b/cpp/benchmarks/ndsh/q01.cpp index a9bb44227b7..a846beef5bc 100644 --- a/cpp/benchmarks/ndsh/q01.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -104,7 +104,7 @@ } void run_ndsh_q1(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projections and filter predicate for `lineitem` table std::vector const lineitem_cols = {"l_returnflag", From 05773a97a3d2a02f1a53ad6f433747f00c43641e Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Thu, 10 Oct 2024 05:11:46 +0000 Subject: [PATCH 06/12] avoid default ctor usage with map.at call --- cpp/benchmarks/ndsh/q01.cpp | 2 +- cpp/benchmarks/ndsh/q05.cpp | 12 ++++++------ cpp/benchmarks/ndsh/q06.cpp | 2 +- cpp/benchmarks/ndsh/q09.cpp | 12 ++++++------ cpp/benchmarks/ndsh/q10.cpp | 8 ++++---- cpp/benchmarks/ndsh/utilities.cpp | 18 +++++++++--------- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/cpp/benchmarks/ndsh/q01.cpp b/cpp/benchmarks/ndsh/q01.cpp index a846beef5bc..7ac4c0bd3fc 100644 --- a/cpp/benchmarks/ndsh/q01.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -125,7 +125,7 @@ void run_ndsh_q1(nvbench::state& state, // Read out the `lineitem` table from parquet file auto lineitem = - read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = diff --git a/cpp/benchmarks/ndsh/q05.cpp b/cpp/benchmarks/ndsh/q05.cpp index e41f3b74728..1c2d657913e 100644 --- a/cpp/benchmarks/ndsh/q05.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -120,17 +120,17 @@ void run_ndsh_q5(nvbench::state& state, // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = - read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"}); + read_parquet(sources.at("customer").make_source_info(), {"c_custkey", "c_nationkey"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); - auto const lineitem = read_parquet(sources["lineitem"].make_source_info(), + read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(sources.at("lineitem").make_source_info(), {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); auto const supplier = - read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); + read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"}); auto const nation = - read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); + read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); auto const region = - read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred)); + read_parquet(sources.at("region").make_source_info(), region_cols, std::move(region_pred)); // Perform the joins auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); diff --git a/cpp/benchmarks/ndsh/q06.cpp b/cpp/benchmarks/ndsh/q06.cpp index fe9e38d5729..5dfdb5c525b 100644 --- a/cpp/benchmarks/ndsh/q06.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -84,7 +84,7 @@ void run_ndsh_q6(nvbench::state& state, auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); auto lineitem = - read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); + read_parquet(sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = diff --git a/cpp/benchmarks/ndsh/q09.cpp b/cpp/benchmarks/ndsh/q09.cpp index f14393b5cfc..3e6b8bdaee9 100644 --- a/cpp/benchmarks/ndsh/q09.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -116,16 +116,16 @@ void run_ndsh_q9(nvbench::state& state, { // Read out the table from parquet files auto const lineitem = read_parquet( - sources["lineitem"].make_source_info(), + sources.at("lineitem").make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"}); + auto const nation = read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_name"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"}); - auto const partsupp = read_parquet(sources["partsupp"].make_source_info(), + read_parquet(sources.at("orders").make_source_info(), {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(sources.at("part").make_source_info(), {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(sources.at("partsupp").make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = - read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); + read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"}); // Generating the `profit` table // Filter the part table using `p_name like '%green%'` diff --git a/cpp/benchmarks/ndsh/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp index 65b838bc972..25299e921ba 100644 --- a/cpp/benchmarks/ndsh/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -122,15 +122,15 @@ void run_ndsh_q10(nvbench::state& state, // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = read_parquet( - sources["customer"].make_source_info(), + sources.at("customer").make_source_info(), {"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); + read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred)); auto const lineitem = - read_parquet(sources["lineitem"].make_source_info(), + read_parquet(sources.at("lineitem").make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"}); + auto const nation = read_parquet(sources.at("nation").make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index cd1291a50e1..7696cda8a6d 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -385,7 +385,7 @@ void generate_parquet_data_sources(double scale_factor, CUDF_FUNC_RANGE(); std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { - sources[table_name] = cuio_source_sink_pair(io_type::HOST_BUFFER); + sources.emplace(table_name, cuio_source_sink_pair(io_type::HOST_BUFFER)); }); auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( @@ -406,14 +406,14 @@ void generate_parquet_data_sources(double scale_factor, auto region = cudf::datagen::generate_region(cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]); - write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]); - write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]); - write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]); - write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]); - write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); - write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); - write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); + write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources.at("orders")); + write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources.at("lineitem")); + write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources.at("part")); + write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources.at("partsupp")); + write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources.at("customer")); + write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources.at("supplier")); + write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources.at("nation")); + write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources.at("region")); // Restore the original memory resource cudf::set_current_device_resource(old_mr); } From 0054933adf5377f96d94999dd0ef4e0a059d2a3f Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Fri, 11 Oct 2024 04:35:18 +0000 Subject: [PATCH 07/12] generate only input table names --- cpp/benchmarks/ndsh/utilities.cpp | 84 ++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 28 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index 7696cda8a6d..dd9c357694a 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -89,6 +89,15 @@ std::vector const NATION_SCHEMA = { "n_nationkey", "n_name", "n_regionkey", "n_comment"}; std::vector const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"}; +std::unordered_map const> const SCHEMAS = { + {"orders", ORDERS_SCHEMA}, + {"lineitem", LINEITEM_SCHEMA}, + {"part", PART_SCHEMA}, + {"partsupp", PARTSUPP_SCHEMA}, + {"supplier", SUPPLIER_SCHEMA}, + {"customer", CUSTOMER_SCHEMA}, + {"nation", NATION_SCHEMA}, + {"region", REGION_SCHEMA}}; } // namespace cudf::table_view table_with_names::table() const { return tbl->view(); } @@ -359,12 +368,10 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, std::vector h_buffer; // Write parquet data to host buffer - auto builder = - cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); + auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); builder.metadata(table_input_metadata); auto const options = builder.build(); cudf::io::write_parquet(options, stream); - // stream.synchronize(); } inline auto make_managed_pool() @@ -377,43 +384,64 @@ void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, std::unordered_map& sources) { + CUDF_FUNC_RANGE(); + // Set the memory resource to the managed pool auto old_mr = cudf::get_current_device_resource(); // fixme: already pool takes 50% of free memory // TODO: release it, and restore it later? auto managed_pool_mr = make_managed_pool(); cudf::set_current_device_resource(managed_pool_mr.get()); - CUDF_FUNC_RANGE(); - std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { - sources.emplace(table_name, cuio_source_sink_pair(io_type::HOST_BUFFER)); - }); - - auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + std::vector const requested_table_names = [&table_names]() { + if (table_names.empty()) { + return std::vector{ + "orders", "lineitem", "part", "partsupp", "supplier", "customer", "nation", "region"}; + } + return table_names; + }(); + std::for_each( + requested_table_names.begin(), requested_table_names.end(), [&](auto const& table_name) { + sources.emplace(table_name, cuio_source_sink_pair(io_type::HOST_BUFFER)); + }); + std::unordered_map> tables; + + if (sources.count("orders") or sources.count("lineitem") or sources.count("part")) { + auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("orders")) tables["orders"] = std::move(orders); + if (sources.count("lineitem")) tables["lineitem"] = std::move(lineitem); + if (sources.count("part")) tables["part"] = std::move(part); + } - auto partsupp = cudf::datagen::generate_partsupp( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("partsupp")) { + tables["partsupp"] = cudf::datagen::generate_partsupp( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + } - auto supplier = cudf::datagen::generate_supplier( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("supplier")) { + tables["supplier"] = cudf::datagen::generate_supplier( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + } - auto customer = cudf::datagen::generate_customer( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("customer")) { + tables["customer"] = cudf::datagen::generate_customer( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + } - auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + if (sources.count("nation")) { + tables["nation"] = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + } - auto region = cudf::datagen::generate_region(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + if (sources.count("region")) { + tables["region"] = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + } - write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources.at("orders")); - write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources.at("lineitem")); - write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources.at("part")); - write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources.at("partsupp")); - write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources.at("customer")); - write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources.at("supplier")); - write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources.at("nation")); - write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources.at("region")); + for (auto const& table_name : requested_table_names) { + write_to_parquet_device_buffer( + tables.at(table_name), SCHEMAS.at(table_name), sources.at(table_name)); + } // Restore the original memory resource cudf::set_current_device_resource(old_mr); } From fc9e71d2bfd614c85f0a02d557548433e5253d37 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Fri, 11 Oct 2024 04:35:49 +0000 Subject: [PATCH 08/12] create managed pool, if not already managed --- cpp/benchmarks/ndsh/utilities.cpp | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index dd9c357694a..b1b7c720c7c 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -387,10 +387,20 @@ void generate_parquet_data_sources(double scale_factor, CUDF_FUNC_RANGE(); // Set the memory resource to the managed pool - auto old_mr = cudf::get_current_device_resource(); // fixme: already pool takes 50% of free memory - // TODO: release it, and restore it later? - auto managed_pool_mr = make_managed_pool(); - cudf::set_current_device_resource(managed_pool_mr.get()); + auto old_mr = cudf::get_current_device_resource(); + // if already managed pool or managed, don't create new one. + using managed_pool_mr_t = decltype(make_managed_pool()); + managed_pool_mr_t managed_pool_mr; + bool const is_managed = + dynamic_cast*>(old_mr) or + dynamic_cast(old_mr); + if (!is_managed) { + std::cout << "Creating managed pool just for data generation\n"; + managed_pool_mr = make_managed_pool(); + cudf::set_current_device_resource(managed_pool_mr.get()); + // drawback: if already pool takes 50% of free memory, we are left with 50% of 50% of free + // memory. + } std::vector const requested_table_names = [&table_names]() { if (table_names.empty()) { @@ -443,5 +453,5 @@ void generate_parquet_data_sources(double scale_factor, tables.at(table_name), SCHEMAS.at(table_name), sources.at(table_name)); } // Restore the original memory resource - cudf::set_current_device_resource(old_mr); + if (!is_managed) { cudf::set_current_device_resource(old_mr); } } From d6935dc3582198dd773bbd5e9e0aaff9cbdbf9c9 Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Fri, 11 Oct 2024 04:47:29 +0000 Subject: [PATCH 09/12] style fix --- cpp/benchmarks/ndsh/q01.cpp | 4 ++-- cpp/benchmarks/ndsh/q06.cpp | 4 ++-- cpp/benchmarks/ndsh/q09.cpp | 5 +++-- cpp/benchmarks/ndsh/q10.cpp | 3 ++- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/benchmarks/ndsh/q01.cpp b/cpp/benchmarks/ndsh/q01.cpp index 7ac4c0bd3fc..485e8e5497c 100644 --- a/cpp/benchmarks/ndsh/q01.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -124,8 +124,8 @@ void run_ndsh_q1(nvbench::state& state, cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal); // Read out the `lineitem` table from parquet file - auto lineitem = - read_parquet(sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); + auto lineitem = read_parquet( + sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = diff --git a/cpp/benchmarks/ndsh/q06.cpp b/cpp/benchmarks/ndsh/q06.cpp index 5dfdb5c525b..e1e56c3622e 100644 --- a/cpp/benchmarks/ndsh/q06.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -83,8 +83,8 @@ void run_ndsh_q6(nvbench::state& state, cudf::ast::operation(cudf::ast::ast_operator::LESS, shipdate_ref, shipdate_upper_literal); auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); - auto lineitem = - read_parquet(sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); + auto lineitem = read_parquet( + sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = diff --git a/cpp/benchmarks/ndsh/q09.cpp b/cpp/benchmarks/ndsh/q09.cpp index 3e6b8bdaee9..2e9a69d9ee2 100644 --- a/cpp/benchmarks/ndsh/q09.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -118,10 +118,11 @@ void run_ndsh_q9(nvbench::state& state, auto const lineitem = read_parquet( sources.at("lineitem").make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_name"}); + auto const nation = + read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_name"}); auto const orders = read_parquet(sources.at("orders").make_source_info(), {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(sources.at("part").make_source_info(), {"p_partkey", "p_name"}); + auto const part = read_parquet(sources.at("part").make_source_info(), {"p_partkey", "p_name"}); auto const partsupp = read_parquet(sources.at("partsupp").make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = diff --git a/cpp/benchmarks/ndsh/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp index 25299e921ba..72edd15083d 100644 --- a/cpp/benchmarks/ndsh/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -130,7 +130,8 @@ void run_ndsh_q10(nvbench::state& state, read_parquet(sources.at("lineitem").make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(sources.at("nation").make_source_info(), {"n_name", "n_nationkey"}); + auto const nation = + read_parquet(sources.at("nation").make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); From ae63f0722e4a532b23b0a696f1661c5c787135a1 Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Fri, 18 Oct 2024 14:27:09 -0500 Subject: [PATCH 10/12] Update cpp/benchmarks/ndsh/utilities.hpp --- cpp/benchmarks/ndsh/utilities.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp index 4d42b540e21..cae07f86a98 100644 --- a/cpp/benchmarks/ndsh/utilities.hpp +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -198,7 +198,7 @@ std::tm make_tm(int year, int month, int day); int32_t days_since_epoch(int year, int month, int day); /** - * @brief Write a `cudf::table` to a parquet device buffer + * @brief Write a `cudf::table` to a parquet cuio sink * * @param table The `cudf::table` to write * @param col_names The column names of the table From c034731bbeab00f7935474e7decd8b57b7fa05dc Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Sat, 19 Oct 2024 15:57:24 +0000 Subject: [PATCH 11/12] interleave parquet writing to reduce memory usage --- cpp/benchmarks/ndsh/utilities.cpp | 44 +++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index b1b7c720c7c..c19c1361ea2 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -402,12 +402,12 @@ void generate_parquet_data_sources(double scale_factor, // memory. } - std::vector const requested_table_names = [&table_names]() { + std::unordered_set const requested_table_names = [&table_names]() { if (table_names.empty()) { - return std::vector{ + return std::unordered_set{ "orders", "lineitem", "part", "partsupp", "supplier", "customer", "nation", "region"}; } - return table_names; + return std::unordered_set(table_names.begin(), table_names.end()); }(); std::for_each( requested_table_names.begin(), requested_table_names.end(), [&](auto const& table_name) { @@ -418,40 +418,50 @@ void generate_parquet_data_sources(double scale_factor, if (sources.count("orders") or sources.count("lineitem") or sources.count("part")) { auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); - if (sources.count("orders")) tables["orders"] = std::move(orders); - if (sources.count("lineitem")) tables["lineitem"] = std::move(lineitem); - if (sources.count("part")) tables["part"] = std::move(part); + if (sources.count("orders")) { + write_to_parquet_device_buffer(orders, SCHEMAS.at("orders"), sources.at("orders")); + orders = {}; + } + if (sources.count("part")) { + write_to_parquet_device_buffer(part, SCHEMAS.at("part"), sources.at("part")); + part = {}; + } + if (sources.count("lineitem")) { + write_to_parquet_device_buffer(lineitem, SCHEMAS.at("lineitem"), sources.at("lineitem")); + lineitem = {}; + } } if (sources.count("partsupp")) { - tables["partsupp"] = cudf::datagen::generate_partsupp( + auto partsupp = cudf::datagen::generate_partsupp( scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(partsupp, SCHEMAS.at("partsupp"), sources.at("partsupp")); } if (sources.count("supplier")) { - tables["supplier"] = cudf::datagen::generate_supplier( + auto supplier = cudf::datagen::generate_supplier( scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(supplier, SCHEMAS.at("supplier"), sources.at("supplier")); } if (sources.count("customer")) { - tables["customer"] = cudf::datagen::generate_customer( + auto customer = cudf::datagen::generate_customer( scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(customer, SCHEMAS.at("customer"), sources.at("customer")); } if (sources.count("nation")) { - tables["nation"] = cudf::datagen::generate_nation(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(nation, SCHEMAS.at("nation"), sources.at("nation")); } if (sources.count("region")) { - tables["region"] = cudf::datagen::generate_region(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + auto region = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(region, SCHEMAS.at("region"), sources.at("region")); } - for (auto const& table_name : requested_table_names) { - write_to_parquet_device_buffer( - tables.at(table_name), SCHEMAS.at(table_name), sources.at(table_name)); - } // Restore the original memory resource if (!is_managed) { cudf::set_current_device_resource(old_mr); } } From d3db1aeb34fa191f67c4d4f6d195cef8fa08b71d Mon Sep 17 00:00:00 2001 From: Karthikeyan Natarajan Date: Sat, 19 Oct 2024 15:57:59 +0000 Subject: [PATCH 12/12] chunked parquet writing for reducing memory writing lineitem table --- cpp/benchmarks/ndsh/utilities.cpp | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index c19c1361ea2..9f9849860c9 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -17,6 +17,8 @@ #include "utilities.hpp" #include "common/ndsh_data_generator/ndsh_data_generator.hpp" +#include "common/table_utilities.hpp" +#include "cudf/detail/utilities/integer_utils.hpp" #include #include @@ -34,8 +36,11 @@ #include #include +#include #include #include +#include +#include namespace { @@ -364,9 +369,29 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, metadata.schema_info = col_name_infos; auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - // Declare a host and device buffer - std::vector h_buffer; - + auto est_size = static_cast(estimate_size(table->view())); + constexpr auto PQ_MAX_TABLE_BYTES = 8ul << 30; // 8GB + // TODO: best to get this limit from percent_of_free_device_memory(50) of device memory resource. + if (est_size > PQ_MAX_TABLE_BYTES) { + auto builder = cudf::io::chunked_parquet_writer_options::builder(source.make_sink_info()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + auto num_splits = static_cast( + std::ceil(static_cast(est_size) / (PQ_MAX_TABLE_BYTES))); + std::vector splits(num_splits - 1); + auto num_rows = table->num_rows(); + auto num_row_per_chunk = cudf::util::div_rounding_up_safe(num_rows, num_splits); + std::generate_n(splits.begin(), splits.size(), [num_row_per_chunk, i = 0]() mutable { + return (i += num_row_per_chunk); + }); + std::vector split_tables = cudf::split(table->view(), splits, stream); + auto writer = cudf::io::parquet_chunked_writer(options, stream); + for (auto const& chunk_table : split_tables) { + writer.write(chunk_table); + } + writer.close(); + return; + } // Write parquet data to host buffer auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); builder.metadata(table_input_metadata);