Skip to content

Commit

Permalink
Add TPC-H inspired examples for Libcudf (#16088)
Browse files Browse the repository at this point in the history
This PR adds a suite of `libcudf` examples with queries inspired from the TPC-H benchmarks. This PR also adds some reusable helper functions to perform operations such as joins, groubys, and orderbys for a cleaner and modular implementation of the queries.

# Queries implemented so far:
- [x] Query 1
- [X] Query 5 
- [X] Query 6 
- [X] Query 9

Authors:
  - Jayjeet Chakraborty (https://github.com/JayjeetAtGithub)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Karthikeyan (https://github.com/karthikeyann)

URL: #16088
  • Loading branch information
JayjeetAtGithub authored Jul 17, 2024
1 parent 8b767e5 commit 34dea6f
Show file tree
Hide file tree
Showing 11 changed files with 1,247 additions and 32 deletions.
1 change: 1 addition & 0 deletions cpp/examples/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ build_example() {
}

build_example basic
build_example tpch
build_example strings
build_example nested_types
build_example parquet_io
4 changes: 3 additions & 1 deletion cpp/examples/parquet_io/parquet_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "parquet_io.hpp"

#include "../utilities/timer.hpp"

/**
* @file parquet_io.cpp
* @brief Demonstrates usage of the libcudf APIs to read and write
Expand Down Expand Up @@ -140,7 +142,7 @@ int main(int argc, char const** argv)
<< page_stat_string << ".." << std::endl;

// `timer` is automatically started here
Timer timer;
cudf::examples::timer timer;
write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats);
timer.print_elapsed_millis();

Expand Down
31 changes: 0 additions & 31 deletions cpp/examples/parquet_io/parquet_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,34 +124,3 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

return std::nullopt;
}

/**
* @brief Light-weight timer for parquet reader and writer instrumentation
*
* Timer object constructed from std::chrono, instrumenting at microseconds
* precision. Can display elapsed durations at milli and micro second
* scales. Timer starts at object construction.
*/
class Timer {
public:
using micros = std::chrono::microseconds;
using millis = std::chrono::milliseconds;

Timer() { reset(); }
void reset() { start_time = std::chrono::high_resolution_clock::now(); }
auto elapsed() { return (std::chrono::high_resolution_clock::now() - start_time); }
void print_elapsed_micros()
{
std::cout << "Elapsed Time: " << std::chrono::duration_cast<micros>(elapsed()).count()
<< "us\n\n";
}
void print_elapsed_millis()
{
std::cout << "Elapsed Time: " << std::chrono::duration_cast<millis>(elapsed()).count()
<< "ms\n\n";
}

private:
using time_point_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
time_point_t start_time;
};
32 changes: 32 additions & 0 deletions cpp/examples/tpch/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

cmake_minimum_required(VERSION 3.26.4)

include(../set_cuda_architecture.cmake)

rapids_cuda_init_architectures(tpch_example)
rapids_cuda_set_architectures(RAPIDS)

project(
tpch_example
VERSION 0.0.1
LANGUAGES CXX CUDA
)

include(../fetch_dependencies.cmake)

add_executable(tpch_q1 q1.cpp)
target_link_libraries(tpch_q1 PRIVATE cudf::cudf)
target_compile_features(tpch_q1 PRIVATE cxx_std_17)

add_executable(tpch_q5 q5.cpp)
target_link_libraries(tpch_q5 PRIVATE cudf::cudf)
target_compile_features(tpch_q5 PRIVATE cxx_std_17)

add_executable(tpch_q6 q6.cpp)
target_link_libraries(tpch_q6 PRIVATE cudf::cudf)
target_compile_features(tpch_q6 PRIVATE cxx_std_17)

add_executable(tpch_q9 q9.cpp)
target_link_libraries(tpch_q9 PRIVATE cudf::cudf)
target_compile_features(tpch_q9 PRIVATE cxx_std_17)
38 changes: 38 additions & 0 deletions cpp/examples/tpch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# TPC-H Inspired Examples

Implements TPC-H queries using `libcudf`. We leverage the data generator (wrapper around official TPC-H datagen) from [Apache Datafusion](https://github.com/apache/datafusion) for generating data in Parquet format.

## Requirements

- Rust

## Generating the Dataset

1. Clone the datafusion repository.
```bash
git clone [email protected]:apache/datafusion.git
```

2. Run the data generator. The data will be placed in a `data/` subdirectory.
```bash
cd datafusion/benchmarks/
./bench.sh data tpch

# for scale factor 10,
./bench.sh data tpch10
```

## Running Queries

1. Build the examples.
```bash
cd cpp/examples
./build.sh
```
The TPC-H query binaries would be built inside `examples/tpch/build`.

2. Execute the queries.
```bash
./tpch/build/tpch_q1
```
A parquet file named `q1.parquet` would be generated holding the results of the query.
174 changes: 174 additions & 0 deletions cpp/examples/tpch/q1.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "../utilities/timer.hpp"
#include "utils.hpp"

#include <cudf/ast/expressions.hpp>
#include <cudf/column/column.hpp>
#include <cudf/scalar/scalar.hpp>

/**
* @file q1.cpp
* @brief Implement query 1 of the TPC-H benchmark.
*
* create view lineitem as select * from '/tables/scale-1/lineitem.parquet';
*
* select
* l_returnflag,
* l_linestatus,
* sum(l_quantity) as sum_qty,
* sum(l_extendedprice) as sum_base_price,
* sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
* sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
* avg(l_quantity) as avg_qty,
* avg(l_extendedprice) as avg_price,
* avg(l_discount) as avg_disc,
* count(*) as count_order
* from
* lineitem
* where
* l_shipdate <= date '1998-09-02'
* group by
* l_returnflag,
* l_linestatus
* order by
* l_returnflag,
* l_linestatus;
*/

/**
* @brief Calculate the discount price column
*
* @param discount The discount column
* @param extendedprice The extended price column
* @param stream The CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*/
[[nodiscard]] std::unique_ptr<cudf::column> calc_disc_price(
cudf::column_view const& discount,
cudf::column_view const& extendedprice,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
auto const one = cudf::numeric_scalar<double>(1);
auto const one_minus_discount =
cudf::binary_operation(one, discount, cudf::binary_operator::SUB, discount.type(), stream, mr);
auto const disc_price_type = cudf::data_type{cudf::type_id::FLOAT64};
auto disc_price = cudf::binary_operation(extendedprice,
one_minus_discount->view(),
cudf::binary_operator::MUL,
disc_price_type,
stream,
mr);
return disc_price;
}

/**
* @brief Calculate the charge column
*
* @param tax The tax column
* @param disc_price The discount price column
* @param stream The CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned column's device memory.
*/
[[nodiscard]] std::unique_ptr<cudf::column> calc_charge(
cudf::column_view const& tax,
cudf::column_view const& disc_price,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
auto const one = cudf::numeric_scalar<double>(1);
auto const one_plus_tax =
cudf::binary_operation(one, tax, cudf::binary_operator::ADD, tax.type(), stream, mr);
auto const charge_type = cudf::data_type{cudf::type_id::FLOAT64};
auto charge = cudf::binary_operation(
disc_price, one_plus_tax->view(), cudf::binary_operator::MUL, charge_type, stream, mr);
return charge;
}

int main(int argc, char const** argv)
{
auto const args = parse_args(argc, argv);

// Use a memory pool
auto resource = create_memory_resource(args.memory_resource_type);
rmm::mr::set_current_device_resource(resource.get());

cudf::examples::timer timer;

// Define the column projections and filter predicate for `lineitem` table
std::vector<std::string> const lineitem_cols = {"l_returnflag",
"l_linestatus",
"l_quantity",
"l_extendedprice",
"l_discount",
"l_shipdate",
"l_orderkey",
"l_tax"};
auto const shipdate_ref = cudf::ast::column_reference(std::distance(
lineitem_cols.begin(), std::find(lineitem_cols.begin(), lineitem_cols.end(), "l_shipdate")));
auto shipdate_upper =
cudf::timestamp_scalar<cudf::timestamp_D>(days_since_epoch(1998, 9, 2), true);
auto const shipdate_upper_literal = cudf::ast::literal(shipdate_upper);
auto lineitem_pred = std::make_unique<cudf::ast::operation>(
cudf::ast::ast_operator::LESS_EQUAL, shipdate_ref, shipdate_upper_literal);

// Read out the `lineitem` table from parquet file
auto lineitem =
read_parquet(args.dataset_dir + "/lineitem.parquet", lineitem_cols, std::move(lineitem_pred));

// Calculate the discount price and charge columns and append to lineitem table
auto disc_price =
calc_disc_price(lineitem->column("l_discount"), lineitem->column("l_extendedprice"));
auto charge = calc_charge(lineitem->column("l_tax"), disc_price->view());
(*lineitem).append(disc_price, "disc_price").append(charge, "charge");

// Perform the group by operation
auto const groupedby_table = apply_groupby(
lineitem,
groupby_context_t{
{"l_returnflag", "l_linestatus"},
{
{"l_extendedprice",
{{cudf::aggregation::Kind::SUM, "sum_base_price"},
{cudf::aggregation::Kind::MEAN, "avg_price"}}},
{"l_quantity",
{{cudf::aggregation::Kind::SUM, "sum_qty"}, {cudf::aggregation::Kind::MEAN, "avg_qty"}}},
{"l_discount",
{
{cudf::aggregation::Kind::MEAN, "avg_disc"},
}},
{"disc_price",
{
{cudf::aggregation::Kind::SUM, "sum_disc_price"},
}},
{"charge",
{{cudf::aggregation::Kind::SUM, "sum_charge"},
{cudf::aggregation::Kind::COUNT_ALL, "count_order"}}},
}});

// Perform the order by operation
auto const orderedby_table = apply_orderby(groupedby_table,
{"l_returnflag", "l_linestatus"},
{cudf::order::ASCENDING, cudf::order::ASCENDING});

timer.print_elapsed_millis();

// Write query result to a parquet file
orderedby_table->to_parquet("q1.parquet");
return 0;
}
Loading

0 comments on commit 34dea6f

Please sign in to comment.