diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index bde6ef7d69c..dce81fb1677 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -57,6 +57,7 @@ build_example() { } build_example basic +build_example tpch build_example strings build_example nested_types build_example parquet_io diff --git a/cpp/examples/parquet_io/parquet_io.cpp b/cpp/examples/parquet_io/parquet_io.cpp index 8be17db3781..274a2599189 100644 --- a/cpp/examples/parquet_io/parquet_io.cpp +++ b/cpp/examples/parquet_io/parquet_io.cpp @@ -16,6 +16,8 @@ #include "parquet_io.hpp" +#include "../utilities/timer.hpp" + /** * @file parquet_io.cpp * @brief Demonstrates usage of the libcudf APIs to read and write @@ -140,7 +142,7 @@ int main(int argc, char const** argv) << page_stat_string << ".." << std::endl; // `timer` is automatically started here - Timer timer; + cudf::examples::timer timer; write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats); timer.print_elapsed_millis(); diff --git a/cpp/examples/parquet_io/parquet_io.hpp b/cpp/examples/parquet_io/parquet_io.hpp index d2fc359a2fe..e27cbec4fce 100644 --- a/cpp/examples/parquet_io/parquet_io.hpp +++ b/cpp/examples/parquet_io/parquet_io.hpp @@ -124,34 +124,3 @@ std::shared_ptr create_memory_resource(bool is_ return std::nullopt; } - -/** - * @brief Light-weight timer for parquet reader and writer instrumentation - * - * Timer object constructed from std::chrono, instrumenting at microseconds - * precision. Can display elapsed durations at milli and micro second - * scales. Timer starts at object construction. - */ -class Timer { - public: - using micros = std::chrono::microseconds; - using millis = std::chrono::milliseconds; - - Timer() { reset(); } - void reset() { start_time = std::chrono::high_resolution_clock::now(); } - auto elapsed() { return (std::chrono::high_resolution_clock::now() - start_time); } - void print_elapsed_micros() - { - std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() - << "us\n\n"; - } - void print_elapsed_millis() - { - std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() - << "ms\n\n"; - } - - private: - using time_point_t = std::chrono::time_point; - time_point_t start_time; -}; diff --git a/cpp/examples/tpch/CMakeLists.txt b/cpp/examples/tpch/CMakeLists.txt new file mode 100644 index 00000000000..1b91d07e148 --- /dev/null +++ b/cpp/examples/tpch/CMakeLists.txt @@ -0,0 +1,32 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cmake_minimum_required(VERSION 3.26.4) + +include(../set_cuda_architecture.cmake) + +rapids_cuda_init_architectures(tpch_example) +rapids_cuda_set_architectures(RAPIDS) + +project( + tpch_example + VERSION 0.0.1 + LANGUAGES CXX CUDA +) + +include(../fetch_dependencies.cmake) + +add_executable(tpch_q1 q1.cpp) +target_link_libraries(tpch_q1 PRIVATE cudf::cudf) +target_compile_features(tpch_q1 PRIVATE cxx_std_17) + +add_executable(tpch_q5 q5.cpp) +target_link_libraries(tpch_q5 PRIVATE cudf::cudf) +target_compile_features(tpch_q5 PRIVATE cxx_std_17) + +add_executable(tpch_q6 q6.cpp) +target_link_libraries(tpch_q6 PRIVATE cudf::cudf) +target_compile_features(tpch_q6 PRIVATE cxx_std_17) + +add_executable(tpch_q9 q9.cpp) +target_link_libraries(tpch_q9 PRIVATE cudf::cudf) +target_compile_features(tpch_q9 PRIVATE cxx_std_17) diff --git a/cpp/examples/tpch/README.md b/cpp/examples/tpch/README.md new file mode 100644 index 00000000000..1ea71ae9824 --- /dev/null +++ b/cpp/examples/tpch/README.md @@ -0,0 +1,38 @@ +# TPC-H Inspired Examples + +Implements TPC-H queries using `libcudf`. We leverage the data generator (wrapper around official TPC-H datagen) from [Apache Datafusion](https://github.com/apache/datafusion) for generating data in Parquet format. + +## Requirements + +- Rust + +## Generating the Dataset + +1. Clone the datafusion repository. +```bash +git clone git@github.com:apache/datafusion.git +``` + +2. Run the data generator. The data will be placed in a `data/` subdirectory. +```bash +cd datafusion/benchmarks/ +./bench.sh data tpch + +# for scale factor 10, +./bench.sh data tpch10 +``` + +## Running Queries + +1. Build the examples. +```bash +cd cpp/examples +./build.sh +``` +The TPC-H query binaries would be built inside `examples/tpch/build`. + +2. Execute the queries. +```bash +./tpch/build/tpch_q1 +``` +A parquet file named `q1.parquet` would be generated holding the results of the query. diff --git a/cpp/examples/tpch/q1.cpp b/cpp/examples/tpch/q1.cpp new file mode 100644 index 00000000000..1bdf039da4a --- /dev/null +++ b/cpp/examples/tpch/q1.cpp @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "utils.hpp" + +#include +#include +#include + +/** + * @file q1.cpp + * @brief Implement query 1 of the TPC-H benchmark. + * + * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; + * + * select + * l_returnflag, + * l_linestatus, + * sum(l_quantity) as sum_qty, + * sum(l_extendedprice) as sum_base_price, + * sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + * sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + * avg(l_quantity) as avg_qty, + * avg(l_extendedprice) as avg_price, + * avg(l_discount) as avg_disc, + * count(*) as count_order + * from + * lineitem + * where + * l_shipdate <= date '1998-09-02' + * group by + * l_returnflag, + * l_linestatus + * order by + * l_returnflag, + * l_linestatus; + */ + +/** + * @brief Calculate the discount price column + * + * @param discount The discount column + * @param extendedprice The extended price column + * @param stream The CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. + */ +[[nodiscard]] std::unique_ptr calc_disc_price( + cudf::column_view const& discount, + cudf::column_view const& extendedprice, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()) +{ + auto const one = cudf::numeric_scalar(1); + auto const one_minus_discount = + cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr); + auto const disc_price_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto disc_price = cudf::binary_operation(extendedprice, + one_minus_discount->view(), + cudf::binary_operator::MUL, + disc_price_type, + stream, + mr); + return disc_price; +} + +/** + * @brief Calculate the charge column + * + * @param tax The tax column + * @param disc_price The discount price column + * @param stream The CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. + */ +[[nodiscard]] std::unique_ptr calc_charge( + cudf::column_view const& tax, + cudf::column_view const& disc_price, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()) +{ + auto const one = cudf::numeric_scalar(1); + auto const one_plus_tax = + cudf::binary_operation(one, tax, cudf::binary_operator::ADD, tax.type(), stream, mr); + auto const charge_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto charge = cudf::binary_operation( + disc_price, one_plus_tax->view(), cudf::binary_operator::MUL, charge_type, stream, mr); + return charge; +} + +int main(int argc, char const** argv) +{ + auto const args = parse_args(argc, argv); + + // Use a memory pool + auto resource = create_memory_resource(args.memory_resource_type); + rmm::mr::set_current_device_resource(resource.get()); + + cudf::examples::timer timer; + + // Define the column projections and filter predicate for `lineitem` table + std::vector const lineitem_cols = {"l_returnflag", + "l_linestatus", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_shipdate", + "l_orderkey", + "l_tax"}; + auto const shipdate_ref = cudf::ast::column_reference(std::distance( + lineitem_cols.begin(), std::find(lineitem_cols.begin(), lineitem_cols.end(), "l_shipdate"))); + auto shipdate_upper = + cudf::timestamp_scalar(days_since_epoch(1998, 9, 2), true); + auto const shipdate_upper_literal = cudf::ast::literal(shipdate_upper); + auto lineitem_pred = std::make_unique( + cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal); + + // Read out the `lineitem` table from parquet file + auto lineitem = + read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + + // Calculate the discount price and charge columns and append to lineitem table + auto disc_price = + calc_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice")); + auto charge = calc_charge(lineitem->column("l_tax"), disc_price->view()); + (*lineitem).append(disc_price, "disc_price").append(charge, "charge"); + + // Perform the group by operation + auto const groupedby_table = apply_groupby( + lineitem, + groupby_context_t{ + {"l_returnflag", "l_linestatus"}, + { + {"l_extendedprice", + {{cudf::aggregation::Kind::SUM, "sum_base_price"}, + {cudf::aggregation::Kind::MEAN, "avg_price"}}}, + {"l_quantity", + {{cudf::aggregation::Kind::SUM, "sum_qty"}, {cudf::aggregation::Kind::MEAN, "avg_qty"}}}, + {"l_discount", + { + {cudf::aggregation::Kind::MEAN, "avg_disc"}, + }}, + {"disc_price", + { + {cudf::aggregation::Kind::SUM, "sum_disc_price"}, + }}, + {"charge", + {{cudf::aggregation::Kind::SUM, "sum_charge"}, + {cudf::aggregation::Kind::COUNT_ALL, "count_order"}}}, + }}); + + // Perform the order by operation + auto const orderedby_table = apply_orderby(groupedby_table, + {"l_returnflag", "l_linestatus"}, + {cudf::order::ASCENDING, cudf::order::ASCENDING}); + + timer.print_elapsed_millis(); + + // Write query result to a parquet file + orderedby_table->to_parquet("q1.parquet"); + return 0; +} diff --git a/cpp/examples/tpch/q5.cpp b/cpp/examples/tpch/q5.cpp new file mode 100644 index 00000000000..e56850b94d6 --- /dev/null +++ b/cpp/examples/tpch/q5.cpp @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "utils.hpp" + +#include +#include +#include + +/** + * @file q5.cpp + * @brief Implement query 5 of the TPC-H benchmark. + * + * create view customer as select * from '/tables/scale-1/customer.parquet'; + * create view orders as select * from '/tables/scale-1/orders.parquet'; + * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; + * create view supplier as select * from '/tables/scale-1/supplier.parquet'; + * create view nation as select * from '/tables/scale-1/nation.parquet'; + * create view region as select * from '/tables/scale-1/region.parquet'; + * + * select + * n_name, + * sum(l_extendedprice * (1 - l_discount)) as revenue + * from + * customer, + * orders, + * lineitem, + * supplier, + * nation, + * region + * where + * c_custkey = o_custkey + * and l_orderkey = o_orderkey + * and l_suppkey = s_suppkey + * and c_nationkey = s_nationkey + * and s_nationkey = n_nationkey + * and n_regionkey = r_regionkey + * and r_name = 'ASIA' + * and o_orderdate >= date '1994-01-01' + * and o_orderdate < date '1995-01-01' + * group by + * n_name + * order by + * revenue desc; + */ + +/** + * @brief Calculate the revenue column + * + * @param extendedprice The extended price column + * @param discount The discount column + * @param stream The CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. + */ +[[nodiscard]] std::unique_ptr calc_revenue( + cudf::column_view const& extendedprice, + cudf::column_view const& discount, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()) +{ + auto const one = cudf::numeric_scalar(1); + auto const one_minus_discount = + cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr); + auto const revenue_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto revenue = cudf::binary_operation(extendedprice, + one_minus_discount->view(), + cudf::binary_operator::MUL, + revenue_type, + stream, + mr); + return revenue; +} + +int main(int argc, char const** argv) +{ + auto const args = parse_args(argc, argv); + + // Use a memory pool + auto resource = create_memory_resource(args.memory_resource_type); + rmm::mr::set_current_device_resource(resource.get()); + + cudf::examples::timer timer; + + // Define the column projection and filter predicate for the `orders` table + std::vector const orders_cols = {"o_custkey", "o_orderkey", "o_orderdate"}; + auto const o_orderdate_ref = cudf::ast::column_reference(std::distance( + orders_cols.begin(), std::find(orders_cols.begin(), orders_cols.end(), "o_orderdate"))); + auto o_orderdate_lower = + cudf::timestamp_scalar(days_since_epoch(1994, 1, 1), true); + auto const o_orderdate_lower_limit = cudf::ast::literal(o_orderdate_lower); + auto const o_orderdate_pred_lower = cudf::ast::operation( + cudf::ast::ast_operator::GREATER_EQUAL, o_orderdate_ref, o_orderdate_lower_limit); + auto o_orderdate_upper = + cudf::timestamp_scalar(days_since_epoch(1995, 1, 1), true); + auto const o_orderdate_upper_limit = cudf::ast::literal(o_orderdate_upper); + auto const o_orderdate_pred_upper = + cudf::ast::operation(cudf::ast::ast_operator::LESS, o_orderdate_ref, o_orderdate_upper_limit); + auto orders_pred = std::make_unique( + cudf::ast::ast_operator::LOGICAL_AND, o_orderdate_pred_lower, o_orderdate_pred_upper); + + // Define the column projection and filter predicate for the `region` table + std::vector const region_cols = {"r_regionkey", "r_name"}; + auto const r_name_ref = cudf::ast::column_reference(std::distance( + region_cols.begin(), std::find(region_cols.begin(), region_cols.end(), "r_name"))); + auto r_name_value = cudf::string_scalar("ASIA"); + auto const r_name_literal = cudf::ast::literal(r_name_value); + auto region_pred = std::make_unique( + cudf::ast::ast_operator::EQUAL, r_name_ref, r_name_literal); + + // Read out the tables from parquet files + // while pushing down the column projections and filter predicates + auto const customer = + read_parquet(args.dataset_dir + "/customer.parquet", {"c_custkey", "c_nationkey"}); + auto const orders = + read_parquet(args.dataset_dir + "/orders.parquet", orders_cols, std::move(orders_pred)); + auto const lineitem = read_parquet(args.dataset_dir + "/lineitem.parquet", + {"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"}); + auto const supplier = + read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + auto const nation = + read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_regionkey", "n_name"}); + auto const region = + read_parquet(args.dataset_dir + "/region.parquet", region_cols, std::move(region_pred)); + + // Perform the joins + auto const join_a = apply_inner_join(region, nation, {"r_regionkey"}, {"n_regionkey"}); + auto const join_b = apply_inner_join(join_a, customer, {"n_nationkey"}, {"c_nationkey"}); + auto const join_c = apply_inner_join(join_b, orders, {"c_custkey"}, {"o_custkey"}); + auto const join_d = apply_inner_join(join_c, lineitem, {"o_orderkey"}, {"l_orderkey"}); + auto joined_table = + apply_inner_join(supplier, join_d, {"s_suppkey", "s_nationkey"}, {"l_suppkey", "n_nationkey"}); + + // Calculate and append the `revenue` column + auto revenue = + calc_revenue(joined_table->column("l_extendedprice"), joined_table->column("l_discount")); + (*joined_table).append(revenue, "revenue"); + + // Perform the groupby operation + auto const groupedby_table = + apply_groupby(joined_table, + groupby_context_t{{"n_name"}, + { + {"revenue", {{cudf::aggregation::Kind::SUM, "revenue"}}}, + }}); + + // Perform the order by operation + auto const orderedby_table = + apply_orderby(groupedby_table, {"revenue"}, {cudf::order::DESCENDING}); + + timer.print_elapsed_millis(); + + // Write query result to a parquet file + orderedby_table->to_parquet("q5.parquet"); + return 0; +} diff --git a/cpp/examples/tpch/q6.cpp b/cpp/examples/tpch/q6.cpp new file mode 100644 index 00000000000..f11b3d6ab3b --- /dev/null +++ b/cpp/examples/tpch/q6.cpp @@ -0,0 +1,137 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "utils.hpp" + +#include +#include +#include + +/** + * @file q6.cpp + * @brief Implement query 6 of the TPC-H benchmark. + * + * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; + * + * select + * sum(l_extendedprice * l_discount) as revenue + * from + * lineitem + * where + * l_shipdate >= date '1994-01-01' + * and l_shipdate < date '1995-01-01' + * and l_discount >= 0.05 + * and l_discount <= 0.07 + * and l_quantity < 24; + */ + +/** + * @brief Calculate the revenue column + * + * @param extendedprice The extended price column + * @param discount The discount column + * @param stream The CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. + */ +[[nodiscard]] std::unique_ptr calc_revenue( + cudf::column_view const& extendedprice, + cudf::column_view const& discount, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()) +{ + auto const revenue_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto revenue = cudf::binary_operation( + extendedprice, discount, cudf::binary_operator::MUL, revenue_type, stream, mr); + return revenue; +} + +int main(int argc, char const** argv) +{ + auto const args = parse_args(argc, argv); + + // Use a memory pool + auto resource = create_memory_resource(args.memory_resource_type); + rmm::mr::set_current_device_resource(resource.get()); + + cudf::examples::timer timer; + + // Read out the `lineitem` table from parquet file + std::vector const lineitem_cols = { + "l_extendedprice", "l_discount", "l_shipdate", "l_quantity"}; + auto const shipdate_ref = cudf::ast::column_reference(std::distance( + lineitem_cols.begin(), std::find(lineitem_cols.begin(), lineitem_cols.end(), "l_shipdate"))); + auto shipdate_lower = + cudf::timestamp_scalar(days_since_epoch(1994, 1, 1), true); + auto const shipdate_lower_literal = cudf::ast::literal(shipdate_lower); + auto shipdate_upper = + cudf::timestamp_scalar(days_since_epoch(1995, 1, 1), true); + auto const shipdate_upper_literal = cudf::ast::literal(shipdate_upper); + auto const shipdate_pred_a = cudf::ast::operation( + cudf::ast::ast_operator::GREATER_EQUAL, shipdate_ref, shipdate_lower_literal); + auto const shipdate_pred_b = + cudf::ast::operation(cudf::ast::ast_operator::LESS, shipdate_ref, shipdate_upper_literal); + auto lineitem_pred = std::make_unique( + cudf::ast::ast_operator::LOGICAL_AND, shipdate_pred_a, shipdate_pred_b); + auto lineitem = + read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred)); + + // Cast the discount and quantity columns to float32 and append to lineitem table + auto discout_float = + cudf::cast(lineitem->column("l_discount"), cudf::data_type{cudf::type_id::FLOAT32}); + auto quantity_float = + cudf::cast(lineitem->column("l_quantity"), cudf::data_type{cudf::type_id::FLOAT32}); + + (*lineitem).append(discout_float, "l_discount_float").append(quantity_float, "l_quantity_float"); + + // Apply the filters + auto const discount_ref = cudf::ast::column_reference(lineitem->col_id("l_discount_float")); + auto const quantity_ref = cudf::ast::column_reference(lineitem->col_id("l_quantity_float")); + + auto discount_lower = cudf::numeric_scalar(0.05); + auto const discount_lower_literal = cudf::ast::literal(discount_lower); + auto discount_upper = cudf::numeric_scalar(0.07); + auto const discount_upper_literal = cudf::ast::literal(discount_upper); + auto quantity_upper = cudf::numeric_scalar(24); + auto const quantity_upper_literal = cudf::ast::literal(quantity_upper); + + auto const discount_pred_a = cudf::ast::operation( + cudf::ast::ast_operator::GREATER_EQUAL, discount_ref, discount_lower_literal); + + auto const discount_pred_b = + cudf::ast::operation(cudf::ast::ast_operator::LESS_EQUAL, discount_ref, discount_upper_literal); + auto const discount_pred = + cudf::ast::operation(cudf::ast::ast_operator::LOGICAL_AND, discount_pred_a, discount_pred_b); + auto const quantity_pred = + cudf::ast::operation(cudf::ast::ast_operator::LESS, quantity_ref, quantity_upper_literal); + auto const discount_quantity_pred = + cudf::ast::operation(cudf::ast::ast_operator::LOGICAL_AND, discount_pred, quantity_pred); + auto const filtered_table = apply_filter(lineitem, discount_quantity_pred); + + // Calculate the `revenue` column + auto revenue = + calc_revenue(filtered_table->column("l_extendedprice"), filtered_table->column("l_discount")); + + // Sum the `revenue` column + auto const revenue_view = revenue->view(); + auto const result_table = apply_reduction(revenue_view, cudf::aggregation::Kind::SUM, "revenue"); + + timer.print_elapsed_millis(); + + // Write query result to a parquet file + result_table->to_parquet("q6.parquet"); + return 0; +} diff --git a/cpp/examples/tpch/q9.cpp b/cpp/examples/tpch/q9.cpp new file mode 100644 index 00000000000..d3c218253f9 --- /dev/null +++ b/cpp/examples/tpch/q9.cpp @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "utils.hpp" + +#include +#include +#include +#include +#include + +/** + * @file q9.cpp + * @brief Implement query 9 of the TPC-H benchmark. + * + * create view part as select * from '/tables/scale-1/part.parquet'; + * create view supplier as select * from '/tables/scale-1/supplier.parquet'; + * create view lineitem as select * from '/tables/scale-1/lineitem.parquet'; + * create view partsupp as select * from '/tables/scale-1/partsupp.parquet'; + * create view orders as select * from '/tables/scale-1/orders.parquet'; + * create view nation as select * from '/tables/scale-1/nation.parquet'; + * + * select + * nation, + * o_year, + * sum(amount) as sum_profit + * from + * ( + * select + * n_name as nation, + * extract(year from o_orderdate) as o_year, + * l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount + * from + * part, + * supplier, + * lineitem, + * partsupp, + * orders, + * nation + * where + * s_suppkey = l_suppkey + * and ps_suppkey = l_suppkey + * and ps_partkey = l_partkey + * and p_partkey = l_partkey + * and o_orderkey = l_orderkey + * and s_nationkey = n_nationkey + * and p_name like '%green%' + * ) as profit + * group by + * nation, + * o_year + * order by + * nation, + * o_year desc; + */ + +/** + * @brief Calculate the amount column + * + * @param discount The discount column + * @param extendedprice The extended price column + * @param supplycost The supply cost column + * @param quantity The quantity column + * @param stream The CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned column's device memory. + */ +[[nodiscard]] std::unique_ptr calc_amount( + cudf::column_view const& discount, + cudf::column_view const& extendedprice, + cudf::column_view const& supplycost, + cudf::column_view const& quantity, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()) +{ + auto const one = cudf::numeric_scalar(1); + auto const one_minus_discount = + cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type()); + auto const extendedprice_discounted_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto const extendedprice_discounted = cudf::binary_operation(extendedprice, + one_minus_discount->view(), + cudf::binary_operator::MUL, + extendedprice_discounted_type, + stream, + mr); + auto const supplycost_quantity_type = cudf::data_type{cudf::type_id::FLOAT64}; + auto const supplycost_quantity = cudf::binary_operation( + supplycost, quantity, cudf::binary_operator::MUL, supplycost_quantity_type); + auto amount = cudf::binary_operation(extendedprice_discounted->view(), + supplycost_quantity->view(), + cudf::binary_operator::SUB, + extendedprice_discounted->type(), + stream, + mr); + return amount; +} + +int main(int argc, char const** argv) +{ + auto const args = parse_args(argc, argv); + + // Use a memory pool + auto resource = create_memory_resource(args.memory_resource_type); + rmm::mr::set_current_device_resource(resource.get()); + + cudf::examples::timer timer; + + // Read out the table from parquet files + auto const lineitem = read_parquet( + args.dataset_dir + "/lineitem.parquet", + {"l_suppkey", "l_partkey", "l_orderkey", "l_extendedprice", "l_discount", "l_quantity"}); + auto const nation = read_parquet(args.dataset_dir + "/nation.parquet", {"n_nationkey", "n_name"}); + auto const orders = + read_parquet(args.dataset_dir + "/orders.parquet", {"o_orderkey", "o_orderdate"}); + auto const part = read_parquet(args.dataset_dir + "/part.parquet", {"p_partkey", "p_name"}); + auto const partsupp = read_parquet(args.dataset_dir + "/partsupp.parquet", + {"ps_suppkey", "ps_partkey", "ps_supplycost"}); + auto const supplier = + read_parquet(args.dataset_dir + "/supplier.parquet", {"s_suppkey", "s_nationkey"}); + + // Generating the `profit` table + // Filter the part table using `p_name like '%green%'` + auto const p_name = part->table().column(1); + auto const mask = + cudf::strings::like(cudf::strings_column_view(p_name), cudf::string_scalar("%green%")); + auto const part_filtered = apply_mask(part, mask); + + // Perform the joins + auto const join_a = apply_inner_join(supplier, nation, {"s_nationkey"}, {"n_nationkey"}); + auto const join_b = apply_inner_join(partsupp, join_a, {"ps_suppkey"}, {"s_suppkey"}); + auto const join_c = apply_inner_join(lineitem, part_filtered, {"l_partkey"}, {"p_partkey"}); + auto const join_d = apply_inner_join(orders, join_c, {"o_orderkey"}, {"l_orderkey"}); + auto const joined_table = + apply_inner_join(join_d, join_b, {"l_suppkey", "l_partkey"}, {"s_suppkey", "ps_partkey"}); + + // Calculate the `nation`, `o_year`, and `amount` columns + auto n_name = std::make_unique(joined_table->column("n_name")); + auto o_year = cudf::datetime::extract_year(joined_table->column("o_orderdate")); + auto amount = calc_amount(joined_table->column("l_discount"), + joined_table->column("l_extendedprice"), + joined_table->column("ps_supplycost"), + joined_table->column("l_quantity")); + + // Put together the `profit` table + std::vector> profit_columns; + profit_columns.push_back(std::move(n_name)); + profit_columns.push_back(std::move(o_year)); + profit_columns.push_back(std::move(amount)); + + auto profit_table = std::make_unique(std::move(profit_columns)); + auto const profit = std::make_unique( + std::move(profit_table), std::vector{"nation", "o_year", "amount"}); + + // Perform the groupby operation + auto const groupedby_table = apply_groupby( + profit, + groupby_context_t{{"nation", "o_year"}, + {{"amount", {{cudf::groupby_aggregation::SUM, "sum_profit"}}}}}); + + // Perform the orderby operation + auto const orderedby_table = apply_orderby( + groupedby_table, {"nation", "o_year"}, {cudf::order::ASCENDING, cudf::order::DESCENDING}); + + timer.print_elapsed_millis(); + + // Write query result to a parquet file + orderedby_table->to_parquet("q9.parquet"); + return 0; +} diff --git a/cpp/examples/tpch/utils.hpp b/cpp/examples/tpch/utils.hpp new file mode 100644 index 00000000000..e586da2c802 --- /dev/null +++ b/cpp/examples/tpch/utils.hpp @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +// RMM memory resource creation utilities +inline auto make_cuda() { return std::make_shared(); } +inline auto make_pool() +{ + return rmm::mr::make_owning_wrapper( + make_cuda(), rmm::percent_of_free_device_memory(50)); +} +inline auto make_managed() { return std::make_shared(); } +inline auto make_managed_pool() +{ + return rmm::mr::make_owning_wrapper( + make_managed(), rmm::percent_of_free_device_memory(50)); +} +inline std::shared_ptr create_memory_resource( + std::string const& mode) +{ + if (mode == "cuda") return make_cuda(); + if (mode == "pool") return make_pool(); + if (mode == "managed") return make_managed(); + if (mode == "managed_pool") return make_managed_pool(); + CUDF_FAIL("Unknown rmm_mode parameter: " + mode + + "\nExpecting: cuda, pool, managed, or managed_pool"); +} + +/** + * @brief A class to represent a table with column names attached + */ +class table_with_names { + public: + table_with_names(std::unique_ptr tbl, std::vector col_names) + : tbl(std::move(tbl)), col_names(col_names) + { + } + /** + * @brief Return the table view + */ + [[nodiscard]] cudf::table_view table() const { return tbl->view(); } + /** + * @brief Return the column view for a given column name + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::column_view column(std::string const& col_name) const + { + return tbl->view().column(col_id(col_name)); + } + /** + * @param Return the column names of the table + */ + [[nodiscard]] std::vector column_names() const { return col_names; } + /** + * @brief Translate a column name to a column index + * + * @param col_name The name of the column + */ + [[nodiscard]] cudf::size_type col_id(std::string const& col_name) const + { + CUDF_FUNC_RANGE(); + auto it = std::find(col_names.begin(), col_names.end(), col_name); + if (it == col_names.end()) { throw std::runtime_error("Column not found"); } + return std::distance(col_names.begin(), it); + } + /** + * @brief Append a column to the table + * + * @param col The column to append + * @param col_name The name of the appended column + */ + table_with_names& append(std::unique_ptr& col, std::string const& col_name) + { + CUDF_FUNC_RANGE(); + auto cols = tbl->release(); + cols.push_back(std::move(col)); + tbl = std::make_unique(std::move(cols)); + col_names.push_back(col_name); + return (*this); + } + /** + * @brief Select a subset of columns from the table + * + * @param col_names The names of the columns to select + */ + [[nodiscard]] cudf::table_view select(std::vector const& col_names) const + { + CUDF_FUNC_RANGE(); + std::vector col_indices; + for (auto const& col_name : col_names) { + col_indices.push_back(col_id(col_name)); + } + return tbl->select(col_indices); + } + /** + * @brief Write the table to a parquet file + * + * @param filepath The path to the parquet file + */ + void to_parquet(std::string const& filepath) const + { + CUDF_FUNC_RANGE(); + auto const sink_info = cudf::io::sink_info(filepath); + cudf::io::table_metadata metadata; + metadata.schema_info = + std::vector(col_names.begin(), col_names.end()); + auto const table_input_metadata = cudf::io::table_input_metadata{metadata}; + auto builder = cudf::io::parquet_writer_options::builder(sink_info, tbl->view()); + builder.metadata(table_input_metadata); + auto const options = builder.build(); + cudf::io::write_parquet(options); + } + + private: + std::unique_ptr tbl; + std::vector col_names; +}; + +/** + * @brief Concatenate two vectors + * + * @param lhs The left vector + * @param rhs The right vector + */ +template +std::vector concat(std::vector const& lhs, std::vector const& rhs) +{ + std::vector result; + result.reserve(lhs.size() + rhs.size()); + std::copy(lhs.begin(), lhs.end(), std::back_inserter(result)); + std::copy(rhs.begin(), rhs.end(), std::back_inserter(result)); + return result; +} + +/** + * @brief Inner join two tables and gather the result + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr join_and_gather( + cudf::table_view const& left_input, + cudf::table_view const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls) +{ + CUDF_FUNC_RANGE(); + constexpr auto oob_policy = cudf::out_of_bounds_policy::DONT_CHECK; + auto const left_selected = left_input.select(left_on); + auto const right_selected = right_input.select(right_on); + auto const [left_join_indices, right_join_indices] = cudf::inner_join( + left_selected, right_selected, compare_nulls, rmm::mr::get_current_device_resource()); + + auto const left_indices_span = cudf::device_span{*left_join_indices}; + auto const right_indices_span = cudf::device_span{*right_join_indices}; + + auto const left_indices_col = cudf::column_view{left_indices_span}; + auto const right_indices_col = cudf::column_view{right_indices_span}; + + auto const left_result = cudf::gather(left_input, left_indices_col, oob_policy); + auto const right_result = cudf::gather(right_input, right_indices_col, oob_policy); + + auto joined_cols = left_result->release(); + auto right_cols = right_result->release(); + joined_cols.insert(joined_cols.end(), + std::make_move_iterator(right_cols.begin()), + std::make_move_iterator(right_cols.end())); + return std::make_unique(std::move(joined_cols)); +} + +/** + * @brief Apply an inner join operation to two tables + * + * @param left_input The left input table + * @param right_input The right input table + * @param left_on The columns to join on in the left table + * @param right_on The columns to join on in the right table + * @param compare_nulls The null equality policy + */ +[[nodiscard]] std::unique_ptr apply_inner_join( + std::unique_ptr const& left_input, + std::unique_ptr const& right_input, + std::vector const& left_on, + std::vector const& right_on, + cudf::null_equality compare_nulls = cudf::null_equality::EQUAL) +{ + CUDF_FUNC_RANGE(); + std::vector left_on_indices; + std::vector right_on_indices; + std::transform( + left_on.begin(), left_on.end(), std::back_inserter(left_on_indices), [&](auto const& col_name) { + return left_input->col_id(col_name); + }); + std::transform(right_on.begin(), + right_on.end(), + std::back_inserter(right_on_indices), + [&](auto const& col_name) { return right_input->col_id(col_name); }); + auto table = join_and_gather( + left_input->table(), right_input->table(), left_on_indices, right_on_indices, compare_nulls); + return std::make_unique( + std::move(table), concat(left_input->column_names(), right_input->column_names())); +} + +/** + * @brief Apply a filter predicated to a table + * + * @param table The input table + * @param predicate The filter predicate + */ +[[nodiscard]] std::unique_ptr apply_filter( + std::unique_ptr const& table, cudf::ast::operation const& predicate) +{ + CUDF_FUNC_RANGE(); + auto const boolean_mask = cudf::compute_column(table->table(), predicate); + auto result_table = cudf::apply_boolean_mask(table->table(), boolean_mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +/** + * @brief Apply a boolean mask to a table + * + * @param table The input table + * @param mask The boolean mask + */ +[[nodiscard]] std::unique_ptr apply_mask( + std::unique_ptr const& table, std::unique_ptr const& mask) +{ + CUDF_FUNC_RANGE(); + auto result_table = cudf::apply_boolean_mask(table->table(), mask->view()); + return std::make_unique(std::move(result_table), table->column_names()); +} + +struct groupby_context_t { + std::vector keys; + std::unordered_map>> + values; +}; + +/** + * @brief Apply a groupby operation to a table + * + * @param table The input table + * @param ctx The groupby context + */ +[[nodiscard]] std::unique_ptr apply_groupby( + std::unique_ptr const& table, groupby_context_t const& ctx) +{ + CUDF_FUNC_RANGE(); + auto const keys = table->select(ctx.keys); + cudf::groupby::groupby groupby_obj(keys); + std::vector result_column_names; + result_column_names.insert(result_column_names.end(), ctx.keys.begin(), ctx.keys.end()); + std::vector requests; + for (auto& [value_col, aggregations] : ctx.values) { + requests.emplace_back(cudf::groupby::aggregation_request()); + for (auto& agg : aggregations) { + if (agg.first == cudf::aggregation::Kind::SUM) { + requests.back().aggregations.push_back( + cudf::make_sum_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::MEAN) { + requests.back().aggregations.push_back( + cudf::make_mean_aggregation()); + } else if (agg.first == cudf::aggregation::Kind::COUNT_ALL) { + requests.back().aggregations.push_back( + cudf::make_count_aggregation()); + } else { + throw std::runtime_error("Unsupported aggregation"); + } + result_column_names.push_back(agg.second); + } + requests.back().values = table->column(value_col); + } + auto agg_results = groupby_obj.aggregate(requests); + std::vector> result_columns; + for (size_t i = 0; i < agg_results.first->num_columns(); i++) { + auto col = std::make_unique(agg_results.first->get_column(i)); + result_columns.push_back(std::move(col)); + } + for (size_t i = 0; i < agg_results.second.size(); i++) { + for (size_t j = 0; j < agg_results.second[i].results.size(); j++) { + result_columns.push_back(std::move(agg_results.second[i].results[j])); + } + } + auto result_table = std::make_unique(std::move(result_columns)); + return std::make_unique(std::move(result_table), result_column_names); +} + +/** + * @brief Apply an order by operation to a table + * + * @param table The input table + * @param sort_keys The sort keys + * @param sort_key_orders The sort key orders + */ +[[nodiscard]] std::unique_ptr apply_orderby( + std::unique_ptr const& table, + std::vector const& sort_keys, + std::vector const& sort_key_orders) +{ + CUDF_FUNC_RANGE(); + std::vector column_views; + for (auto& key : sort_keys) { + column_views.push_back(table->column(key)); + } + auto result_table = + cudf::sort_by_key(table->table(), cudf::table_view{column_views}, sort_key_orders); + return std::make_unique(std::move(result_table), table->column_names()); +} + +/** + * @brief Apply a reduction operation to a column + * + * @param column The input column + * @param agg_kind The aggregation kind + * @param col_name The name of the output column + */ +[[nodiscard]] std::unique_ptr apply_reduction( + cudf::column_view const& column, + cudf::aggregation::Kind const& agg_kind, + std::string const& col_name) +{ + CUDF_FUNC_RANGE(); + auto const agg = cudf::make_sum_aggregation(); + auto const result = cudf::reduce(column, *agg, column.type()); + cudf::size_type const len = 1; + auto col = cudf::make_column_from_scalar(*result, len); + std::vector> columns; + columns.push_back(std::move(col)); + auto result_table = std::make_unique(std::move(columns)); + std::vector col_names = {col_name}; + return std::make_unique(std::move(result_table), col_names); +} + +/** + * @brief Read a parquet file into a table + * + * @param filename The path to the parquet file + * @param columns The columns to read + * @param predicate The filter predicate to pushdown + */ +[[nodiscard]] std::unique_ptr read_parquet( + std::string const& filename, + std::vector const& columns = {}, + std::unique_ptr const& predicate = nullptr) +{ + CUDF_FUNC_RANGE(); + auto const source = cudf::io::source_info(filename); + auto builder = cudf::io::parquet_reader_options_builder(source); + if (!columns.empty()) { builder.columns(columns); } + if (predicate) { builder.filter(*predicate); } + auto const options = builder.build(); + auto table_with_metadata = cudf::io::read_parquet(options); + std::vector column_names; + for (auto const& col_info : table_with_metadata.metadata.schema_info) { + column_names.push_back(col_info.name); + } + return std::make_unique(std::move(table_with_metadata.tbl), column_names); +} + +/** + * @brief Generate the `std::tm` structure from year, month, and day + * + * @param year The year + * @param month The month + * @param day The day + */ +std::tm make_tm(int year, int month, int day) +{ + std::tm tm{}; + tm.tm_year = year - 1900; + tm.tm_mon = month - 1; + tm.tm_mday = day; + return tm; +} + +/** + * @brief Calculate the number of days since the UNIX epoch + * + * @param year The year + * @param month The month + * @param day The day + */ +int32_t days_since_epoch(int year, int month, int day) +{ + std::tm tm = make_tm(year, month, day); + std::tm epoch = make_tm(1970, 1, 1); + std::time_t time = std::mktime(&tm); + std::time_t epoch_time = std::mktime(&epoch); + double diff = std::difftime(time, epoch_time) / (60 * 60 * 24); + return static_cast(diff); +} + +struct tpch_example_args { + std::string dataset_dir; + std::string memory_resource_type; +}; + +/** + * @brief Parse command line arguments into a struct + * + * @param argc The number of command line arguments + * @param argv The command line arguments + */ +tpch_example_args parse_args(int argc, char const** argv) +{ + if (argc < 3) { + std::string usage_message = "Usage: " + std::string(argv[0]) + + " \n The query result will be " + "saved to a parquet file named q{query_no}.parquet in the current " + "working directory "; + throw std::runtime_error(usage_message); + } + tpch_example_args args; + args.dataset_dir = argv[1]; + args.memory_resource_type = argv[2]; + return args; +} diff --git a/cpp/examples/utilities/timer.hpp b/cpp/examples/utilities/timer.hpp new file mode 100644 index 00000000000..65fa92e74cf --- /dev/null +++ b/cpp/examples/utilities/timer.hpp @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace cudf { +namespace examples { +/** + * @brief Light-weight timer for measuring elapsed time. + * + * A timer object constructed from std::chrono, instrumenting at microseconds + * precision. Can display elapsed durations at milli and micro second + * scales. The timer starts at object construction. + */ +class timer { + public: + using micros = std::chrono::microseconds; + using millis = std::chrono::milliseconds; + + timer() { reset(); } + void reset() { start_time = std::chrono::high_resolution_clock::now(); } + auto elapsed() const { return (std::chrono::high_resolution_clock::now() - start_time); } + void print_elapsed_micros() const + { + std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() + << "us\n\n"; + } + void print_elapsed_millis() const + { + std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() + << "ms\n\n"; + } + + private: + using time_point_t = std::chrono::time_point; + time_point_t start_time; +}; + +} // namespace examples +}; // namespace cudf