From e7653a70743a76ad3c8ca4b377aa0ec4303e5556 Mon Sep 17 00:00:00 2001 From: Karthikeyan <6488848+karthikeyann@users.noreply.github.com> Date: Wed, 23 Oct 2024 13:23:04 -0500 Subject: [PATCH] Use managed memory for NDSH benchmarks (#17039) Fixes https://github.com/rapidsai/cudf/issues/16987 Use managed memory to generate the parquet data, and write parquet data to host buffer. Replace use of parquet_device_buffer with cuio_source_sink_pair Authors: - Karthikeyan (https://github.com/karthikeyann) Approvers: - David Wendt (https://github.com/davidwendt) - Tianyu Liu (https://github.com/kingcrimsontianyu) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/17039 --- cpp/benchmarks/CMakeLists.txt | 2 +- cpp/benchmarks/ndsh/q01.cpp | 8 +- cpp/benchmarks/ndsh/q05.cpp | 16 +-- cpp/benchmarks/ndsh/q06.cpp | 8 +- cpp/benchmarks/ndsh/q09.cpp | 17 ++-- cpp/benchmarks/ndsh/q10.cpp | 13 +-- cpp/benchmarks/ndsh/utilities.cpp | 157 +++++++++++++++++++++++------- cpp/benchmarks/ndsh/utilities.hpp | 19 ++-- 8 files changed, 161 insertions(+), 79 deletions(-) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index e61a8e6e1e6..f013b31b3de 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -49,7 +49,7 @@ target_compile_options( target_link_libraries( ndsh_data_generator - PUBLIC cudf GTest::gmock GTest::gtest cudf::cudftestutil nvtx3::nvtx3-cpp + PUBLIC cudf cudf::cudftestutil nvtx3::nvtx3-cpp PRIVATE $ ) diff --git a/cpp/benchmarks/ndsh/q01.cpp b/cpp/benchmarks/ndsh/q01.cpp index ef709926ae9..485e8e5497c 100644 --- a/cpp/benchmarks/ndsh/q01.cpp +++ b/cpp/benchmarks/ndsh/q01.cpp @@ -104,7 +104,7 @@ } void run_ndsh_q1(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projections and filter predicate for `lineitem` table std::vector const lineitem_cols = {"l_returnflag", @@ -124,8 +124,8 @@ void run_ndsh_q1(nvbench::state& state, cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal); // Read out the `lineitem` table from parquet file - auto lineitem = - read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); + auto lineitem = read_parquet( + sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Calculate the discount price and charge columns and append to lineitem table auto disc_price = @@ -170,7 +170,7 @@ void ndsh_q1(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); auto stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/ndsh/q05.cpp b/cpp/benchmarks/ndsh/q05.cpp index 522bc4789c2..1c2d657913e 100644 --- a/cpp/benchmarks/ndsh/q05.cpp +++ b/cpp/benchmarks/ndsh/q05.cpp @@ -89,7 +89,7 @@ } void run_ndsh_q5(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; @@ -120,17 +120,17 @@ void run_ndsh_q5(nvbench::state& state, // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = - read_parquet(sources["customer"].make_source_info(), {"c_custkey", "c_nationkey"}); + read_parquet(sources.at("customer").make_source_info(), {"c_custkey", "c_nationkey"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); - auto const lineitem = read_parquet(sources["lineitem"].make_source_info(), + read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(sources.at("lineitem").make_source_info(), {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); auto const supplier = - read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); + read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"}); auto const nation = - read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); + read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_regionkey", "n_name"}); auto const region = - read_parquet(sources["region"].make_source_info(), region_cols, std::move(region_pred)); + read_parquet(sources.at("region").make_source_info(), region_cols, std::move(region_pred)); // Perform the joins auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); @@ -165,7 +165,7 @@ void ndsh_q5(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"customer", "orders", "lineitem", "supplier", "nation", "region"}, sources); diff --git a/cpp/benchmarks/ndsh/q06.cpp b/cpp/benchmarks/ndsh/q06.cpp index 04078547973..e1e56c3622e 100644 --- a/cpp/benchmarks/ndsh/q06.cpp +++ b/cpp/benchmarks/ndsh/q06.cpp @@ -64,7 +64,7 @@ } void run_ndsh_q6(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Read out the `lineitem` table from parquet file std::vector const lineitem_cols = { @@ -83,8 +83,8 @@ void run_ndsh_q6(nvbench::state& state, cudf::ast::operation(cudf::ast::ast_operator::LESS, shipdate_ref, shipdate_upper_literal); auto const lineitem_pred = std::make_unique( cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); - auto lineitem = - read_parquet(sources["lineitem"].make_source_info(), lineitem_cols, std::move(lineitem_pred)); + auto lineitem = read_parquet( + sources.at("lineitem").make_source_info(), lineitem_cols, std::move(lineitem_pred)); // Cast the discount and quantity columns to float32 and append to lineitem table auto discout_float = @@ -134,7 +134,7 @@ void ndsh_q6(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources(scale_factor, {"lineitem"}, sources); auto stream = cudf::get_default_stream(); diff --git a/cpp/benchmarks/ndsh/q09.cpp b/cpp/benchmarks/ndsh/q09.cpp index 59218ab8912..2e9a69d9ee2 100644 --- a/cpp/benchmarks/ndsh/q09.cpp +++ b/cpp/benchmarks/ndsh/q09.cpp @@ -112,20 +112,21 @@ } void run_ndsh_q9(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Read out the table from parquet files auto const lineitem = read_parquet( - sources["lineitem"].make_source_info(), + sources.at("lineitem").make_source_info(), {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); - auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_nationkey", "n_name"}); + auto const nation = + read_parquet(sources.at("nation").make_source_info(), {"n_nationkey", "n_name"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), {"o_orderkey", "o_orderdate"}); - auto const part = read_parquet(sources["part"].make_source_info(), {"p_partkey", "p_name"}); - auto const partsupp = read_parquet(sources["partsupp"].make_source_info(), + read_parquet(sources.at("orders").make_source_info(), {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(sources.at("part").make_source_info(), {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(sources.at("partsupp").make_source_info(), {"ps_suppkey", "ps_partkey", "ps_supplycost"}); auto const supplier = - read_parquet(sources["supplier"].make_source_info(), {"s_suppkey", "s_nationkey"}); + read_parquet(sources.at("supplier").make_source_info(), {"s_suppkey", "s_nationkey"}); // Generating the `profit` table // Filter the part table using `p_name like '%green%'` @@ -178,7 +179,7 @@ void ndsh_q9(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"part", "supplier", "lineitem", "partsupp", "orders", "nation"}, sources); diff --git a/cpp/benchmarks/ndsh/q10.cpp b/cpp/benchmarks/ndsh/q10.cpp index a520480020a..72edd15083d 100644 --- a/cpp/benchmarks/ndsh/q10.cpp +++ b/cpp/benchmarks/ndsh/q10.cpp @@ -94,7 +94,7 @@ } void run_ndsh_q10(nvbench::state& state, - std::unordered_map& sources) + std::unordered_map& sources) { // Define the column projection and filter predicate for the `orders` table std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; @@ -122,15 +122,16 @@ void run_ndsh_q10(nvbench::state& state, // Read out the tables from parquet files // while pushing down the column projections and filter predicates auto const customer = read_parquet( - sources["customer"].make_source_info(), + sources.at("customer").make_source_info(), {"c_custkey", "c_name", "c_nationkey", "c_acctbal", "c_address", "c_phone", "c_comment"}); auto const orders = - read_parquet(sources["orders"].make_source_info(), orders_cols, std::move(orders_pred)); + read_parquet(sources.at("orders").make_source_info(), orders_cols, std::move(orders_pred)); auto const lineitem = - read_parquet(sources["lineitem"].make_source_info(), + read_parquet(sources.at("lineitem").make_source_info(), {"l_extendedprice", "l_discount", "l_orderkey", "l_returnflag"}, std::move(lineitem_pred)); - auto const nation = read_parquet(sources["nation"].make_source_info(), {"n_name", "n_nationkey"}); + auto const nation = + read_parquet(sources.at("nation").make_source_info(), {"n_name", "n_nationkey"}); // Perform the joins auto const join_a = apply_inner_join(customer, nation, {"c_nationkey"}, {"n_nationkey"}); @@ -163,7 +164,7 @@ void ndsh_q10(nvbench::state& state) { // Generate the required parquet files in device buffers double const scale_factor = state.get_float64("scale_factor"); - std::unordered_map sources; + std::unordered_map sources; generate_parquet_data_sources( scale_factor, {"customer", "orders", "lineitem", "nation"}, sources); diff --git a/cpp/benchmarks/ndsh/utilities.cpp b/cpp/benchmarks/ndsh/utilities.cpp index 62116ddf661..9f9849860c9 100644 --- a/cpp/benchmarks/ndsh/utilities.cpp +++ b/cpp/benchmarks/ndsh/utilities.cpp @@ -17,6 +17,8 @@ #include "utilities.hpp" #include "common/ndsh_data_generator/ndsh_data_generator.hpp" +#include "common/table_utilities.hpp" +#include "cudf/detail/utilities/integer_utils.hpp" #include #include @@ -30,8 +32,15 @@ #include #include +#include +#include +#include + +#include #include #include +#include +#include namespace { @@ -85,6 +94,15 @@ std::vector const NATION_SCHEMA = { "n_nationkey", "n_name", "n_regionkey", "n_comment"}; std::vector const REGION_SCHEMA = {"r_regionkey", "r_name", "r_comment"}; +std::unordered_map const> const SCHEMAS = { + {"orders", ORDERS_SCHEMA}, + {"lineitem", LINEITEM_SCHEMA}, + {"part", PART_SCHEMA}, + {"partsupp", PARTSUPP_SCHEMA}, + {"supplier", SUPPLIER_SCHEMA}, + {"customer", CUSTOMER_SCHEMA}, + {"nation", NATION_SCHEMA}, + {"region", REGION_SCHEMA}}; } // namespace cudf::table_view table_with_names::table() const { return tbl->view(); } @@ -337,7 +355,7 @@ int32_t days_since_epoch(int year, int month, int day) void write_to_parquet_device_buffer(std::unique_ptr const& table, std::vector const& col_names, - parquet_device_buffer& source) + cuio_source_sink_pair& source) { CUDF_FUNC_RANGE(); auto const stream = cudf::get_default_stream(); @@ -351,55 +369,124 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, metadata.schema_info = col_name_infos; auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; - // Declare a host and device buffer - std::vector h_buffer; - + auto est_size = static_cast(estimate_size(table->view())); + constexpr auto PQ_MAX_TABLE_BYTES = 8ul << 30; // 8GB + // TODO: best to get this limit from percent_of_free_device_memory(50) of device memory resource. + if (est_size > PQ_MAX_TABLE_BYTES) { + auto builder = cudf::io::chunked_parquet_writer_options::builder(source.make_sink_info()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + auto num_splits = static_cast( + std::ceil(static_cast(est_size) / (PQ_MAX_TABLE_BYTES))); + std::vector splits(num_splits - 1); + auto num_rows = table->num_rows(); + auto num_row_per_chunk = cudf::util::div_rounding_up_safe(num_rows, num_splits); + std::generate_n(splits.begin(), splits.size(), [num_row_per_chunk, i = 0]() mutable { + return (i += num_row_per_chunk); + }); + std::vector split_tables = cudf::split(table->view(), splits, stream); + auto writer = cudf::io::parquet_chunked_writer(options, stream); + for (auto const& chunk_table : split_tables) { + writer.write(chunk_table); + } + writer.close(); + return; + } // Write parquet data to host buffer - auto builder = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info(&h_buffer), table->view()); + auto builder = cudf::io::parquet_writer_options::builder(source.make_sink_info(), table->view()); builder.metadata(table_input_metadata); auto const options = builder.build(); - cudf::io::write_parquet(options); + cudf::io::write_parquet(options, stream); +} - // Copy host buffer to device buffer - source.d_buffer.resize(h_buffer.size(), stream); - CUDF_CUDA_TRY(cudaMemcpyAsync( - source.d_buffer.data(), h_buffer.data(), h_buffer.size(), cudaMemcpyDefault, stream.value())); +inline auto make_managed_pool() +{ + return rmm::mr::make_owning_wrapper( + std::make_shared(), rmm::percent_of_free_device_memory(50)); } void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, - std::unordered_map& sources) + std::unordered_map& sources) { CUDF_FUNC_RANGE(); - std::for_each(table_names.begin(), table_names.end(), [&](auto const& table_name) { - sources[table_name] = parquet_device_buffer(); - }); - auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + // Set the memory resource to the managed pool + auto old_mr = cudf::get_current_device_resource(); + // if already managed pool or managed, don't create new one. + using managed_pool_mr_t = decltype(make_managed_pool()); + managed_pool_mr_t managed_pool_mr; + bool const is_managed = + dynamic_cast*>(old_mr) or + dynamic_cast(old_mr); + if (!is_managed) { + std::cout << "Creating managed pool just for data generation\n"; + managed_pool_mr = make_managed_pool(); + cudf::set_current_device_resource(managed_pool_mr.get()); + // drawback: if already pool takes 50% of free memory, we are left with 50% of 50% of free + // memory. + } - auto partsupp = cudf::datagen::generate_partsupp( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + std::unordered_set const requested_table_names = [&table_names]() { + if (table_names.empty()) { + return std::unordered_set{ + "orders", "lineitem", "part", "partsupp", "supplier", "customer", "nation", "region"}; + } + return std::unordered_set(table_names.begin(), table_names.end()); + }(); + std::for_each( + requested_table_names.begin(), requested_table_names.end(), [&](auto const& table_name) { + sources.emplace(table_name, cuio_source_sink_pair(io_type::HOST_BUFFER)); + }); + std::unordered_map> tables; + + if (sources.count("orders") or sources.count("lineitem") or sources.count("part")) { + auto [orders, lineitem, part] = cudf::datagen::generate_orders_lineitem_part( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("orders")) { + write_to_parquet_device_buffer(orders, SCHEMAS.at("orders"), sources.at("orders")); + orders = {}; + } + if (sources.count("part")) { + write_to_parquet_device_buffer(part, SCHEMAS.at("part"), sources.at("part")); + part = {}; + } + if (sources.count("lineitem")) { + write_to_parquet_device_buffer(lineitem, SCHEMAS.at("lineitem"), sources.at("lineitem")); + lineitem = {}; + } + } + + if (sources.count("partsupp")) { + auto partsupp = cudf::datagen::generate_partsupp( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(partsupp, SCHEMAS.at("partsupp"), sources.at("partsupp")); + } - auto supplier = cudf::datagen::generate_supplier( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("supplier")) { + auto supplier = cudf::datagen::generate_supplier( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(supplier, SCHEMAS.at("supplier"), sources.at("supplier")); + } - auto customer = cudf::datagen::generate_customer( - scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + if (sources.count("customer")) { + auto customer = cudf::datagen::generate_customer( + scale_factor, cudf::get_default_stream(), cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(customer, SCHEMAS.at("customer"), sources.at("customer")); + } - auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + if (sources.count("nation")) { + auto nation = cudf::datagen::generate_nation(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(nation, SCHEMAS.at("nation"), sources.at("nation")); + } - auto region = cudf::datagen::generate_region(cudf::get_default_stream(), - cudf::get_current_device_resource_ref()); + if (sources.count("region")) { + auto region = cudf::datagen::generate_region(cudf::get_default_stream(), + cudf::get_current_device_resource_ref()); + write_to_parquet_device_buffer(region, SCHEMAS.at("region"), sources.at("region")); + } - write_to_parquet_device_buffer(std::move(orders), ORDERS_SCHEMA, sources["orders"]); - write_to_parquet_device_buffer(std::move(lineitem), LINEITEM_SCHEMA, sources["lineitem"]); - write_to_parquet_device_buffer(std::move(part), PART_SCHEMA, sources["part"]); - write_to_parquet_device_buffer(std::move(partsupp), PARTSUPP_SCHEMA, sources["partsupp"]); - write_to_parquet_device_buffer(std::move(customer), CUSTOMER_SCHEMA, sources["customer"]); - write_to_parquet_device_buffer(std::move(supplier), SUPPLIER_SCHEMA, sources["supplier"]); - write_to_parquet_device_buffer(std::move(nation), NATION_SCHEMA, sources["nation"]); - write_to_parquet_device_buffer(std::move(region), REGION_SCHEMA, sources["region"]); + // Restore the original memory resource + if (!is_managed) { cudf::set_current_device_resource(old_mr); } } diff --git a/cpp/benchmarks/ndsh/utilities.hpp b/cpp/benchmarks/ndsh/utilities.hpp index 762e43deccf..cae07f86a98 100644 --- a/cpp/benchmarks/ndsh/utilities.hpp +++ b/cpp/benchmarks/ndsh/utilities.hpp @@ -14,6 +14,8 @@ * limitations under the License. */ +#include "io/cuio_common.hpp" + #include #include #include @@ -196,24 +198,15 @@ std::tm make_tm(int year, int month, int day); int32_t days_since_epoch(int year, int month, int day); /** - * @brief Struct representing a parquet device buffer - */ -struct parquet_device_buffer { - parquet_device_buffer() : d_buffer{0, cudf::get_default_stream()} {}; - cudf::io::source_info make_source_info() { return cudf::io::source_info(d_buffer); } - rmm::device_uvector d_buffer; -}; - -/** - * @brief Write a `cudf::table` to a parquet device buffer + * @brief Write a `cudf::table` to a parquet cuio sink * * @param table The `cudf::table` to write * @param col_names The column names of the table - * @param parquet_device_buffer The parquet device buffer to write the table to + * @param source The source sink pair to write the table to */ void write_to_parquet_device_buffer(std::unique_ptr const& table, std::vector const& col_names, - parquet_device_buffer& source); + cuio_source_sink_pair& source); /** * @brief Generate NDS-H tables and write to parquet device buffers @@ -224,4 +217,4 @@ void write_to_parquet_device_buffer(std::unique_ptr const& table, */ void generate_parquet_data_sources(double scale_factor, std::vector const& table_names, - std::unordered_map& sources); + std::unordered_map& sources);