From bff301527d074cd8f98d1a2d8dddedbf8830dffd Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 13 May 2024 01:33:29 -0700 Subject: [PATCH] Adding parquet transcoding example (#15420) This PR adds a new example `parquet_io` to `libcudf/cpp/examples` instrumenting reading and writing parquet files with different column encodings (same for all columns for now) and compressions to close #15344. The example maybe elaborated and/or evolved as needed. #15348 should be merged before this PR to get all CMake updates needed to successfully build and run this example. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/cudf/pull/15420 --- ci/run_cudf_examples.sh | 3 + cpp/examples/build.sh | 1 + cpp/examples/parquet_io/CMakeLists.txt | 25 ++++ cpp/examples/parquet_io/example.parquet | Bin 0 -> 614 bytes cpp/examples/parquet_io/parquet_io.cpp | 172 ++++++++++++++++++++++++ cpp/examples/parquet_io/parquet_io.hpp | 157 +++++++++++++++++++++ 6 files changed, 358 insertions(+) create mode 100644 cpp/examples/parquet_io/CMakeLists.txt create mode 100644 cpp/examples/parquet_io/example.parquet create mode 100644 cpp/examples/parquet_io/parquet_io.cpp create mode 100644 cpp/examples/parquet_io/parquet_io.hpp diff --git a/ci/run_cudf_examples.sh b/ci/run_cudf_examples.sh index f3561bc595c..0819eacf636 100755 --- a/ci/run_cudf_examples.sh +++ b/ci/run_cudf_examples.sh @@ -23,4 +23,7 @@ compute-sanitizer --tool memcheck custom_optimized names.csv compute-sanitizer --tool memcheck custom_prealloc names.csv compute-sanitizer --tool memcheck custom_with_malloc names.csv +compute-sanitizer --tool memcheck parquet_io +compute-sanitizer --tool memcheck parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD TRUE + exit ${EXITCODE} diff --git a/cpp/examples/build.sh b/cpp/examples/build.sh index 9802c876930..bde6ef7d69c 100755 --- a/cpp/examples/build.sh +++ b/cpp/examples/build.sh @@ -59,3 +59,4 @@ build_example() { build_example basic build_example strings build_example nested_types +build_example parquet_io diff --git a/cpp/examples/parquet_io/CMakeLists.txt b/cpp/examples/parquet_io/CMakeLists.txt new file mode 100644 index 00000000000..d8e9205ffd4 --- /dev/null +++ b/cpp/examples/parquet_io/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +cmake_minimum_required(VERSION 3.26.4) + +include(../set_cuda_architecture.cmake) + +# initialize cuda architecture +rapids_cuda_init_architectures(parquet_io) +rapids_cuda_set_architectures(RAPIDS) + +project( + parquet_io + VERSION 0.0.1 + LANGUAGES CXX CUDA +) + +include(../fetch_dependencies.cmake) + +# Configure your project here +add_executable(parquet_io parquet_io.cpp) +target_link_libraries(parquet_io PRIVATE cudf::cudf) +target_compile_features(parquet_io PRIVATE cxx_std_17) + +install(TARGETS parquet_io DESTINATION bin/examples/libcudf) +install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/parquet_io/example.parquet b/cpp/examples/parquet_io/example.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f0fb5319cb040395b3d3841501e48de01fc0c7ba GIT binary patch literal 614 zcmZ9K!A^rf5Qew4MLc*hSpo?MveCvvB~Y7cOfOyYerfIgYttc#HSyWPKd?l}a z44=qdXtb2mKmW}9^92~+Ph1F~1JAoq5ki3q0J{3q0_{fY>$H8Wg6-x+g-xUTprqb2 zi7;#e*t!6EBMm>#%ps7ErWmGSc7O?61_Xxa!KS8Y6K&K{hXZh_Za3vz6f==ZK3b!I`s8Eo#bFlF6x~_VHF_1EP>5l=BB*O2iX$QnnuVdLQjQrHrBmuS z5%Ri}L`t`qiK5Dzm*Hd=TTfXAB=qNXufm%1Jdx*6MguSQD}pRa^1nvBcp)B_WD()m z#~h3CH6m1u2XrNOpB_h7x2V{IsZ*d-uGb(c>ww|^6s;lfAR;~qIUe+u9RL6T literal 0 HcmV?d00001 diff --git a/cpp/examples/parquet_io/parquet_io.cpp b/cpp/examples/parquet_io/parquet_io.cpp new file mode 100644 index 00000000000..8be17db3781 --- /dev/null +++ b/cpp/examples/parquet_io/parquet_io.cpp @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parquet_io.hpp" + +/** + * @file parquet_io.cpp + * @brief Demonstrates usage of the libcudf APIs to read and write + * parquet file format with different encodings and compression types + * + * The following encoding and compression ztypes are demonstrated: + * Encoding Types: DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED, + * DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY + * + * Compression Types: NONE, AUTO, SNAPPY, LZ4, ZSTD + * + */ + +/** + * @brief Read parquet input from file + * + * @param filepath path to input parquet file + * @return cudf::io::table_with_metadata + */ +cudf::io::table_with_metadata read_parquet(std::string filepath) +{ + auto source_info = cudf::io::source_info(filepath); + auto builder = cudf::io::parquet_reader_options::builder(source_info); + auto options = builder.build(); + return cudf::io::read_parquet(options); +} + +/** + * @brief Write parquet output to file + * + * @param input table to write + * @param metadata metadata of input table read by parquet reader + * @param filepath path to output parquet file + * @param stats_level optional page size stats level + */ +void write_parquet(cudf::table_view input, + cudf::io::table_metadata metadata, + std::string filepath, + cudf::io::column_encoding encoding, + cudf::io::compression_type compression, + std::optional stats_level) +{ + // write the data for inspection + auto sink_info = cudf::io::sink_info(filepath); + auto builder = cudf::io::parquet_writer_options::builder(sink_info, input); + auto table_metadata = cudf::io::table_input_metadata{metadata}; + + std::for_each(table_metadata.column_metadata.begin(), + table_metadata.column_metadata.end(), + [=](auto& col_meta) { col_meta.set_encoding(encoding); }); + + builder.metadata(table_metadata); + auto options = builder.build(); + options.set_compression(compression); + // Either use the input stats level or don't write stats + options.set_stats_level(stats_level.value_or(cudf::io::statistics_freq::STATISTICS_NONE)); + + // write parquet data + cudf::io::write_parquet(options); +} + +/** + * @brief Main for nested_types examples + * + * Command line parameters: + * 1. parquet input file name/path (default: "example.parquet") + * 2. parquet output file name/path (default: "output.parquet") + * 3. encoding type for columns (default: "DELTA_BINARY_PACKED") + * 4. compression type (default: "ZSTD") + * 5. optional: use page size stats metadata (default: "NO") + * + * Example invocation from directory `cudf/cpp/examples/parquet_io`: + * ./build/parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD + * + */ +int main(int argc, char const** argv) +{ + std::string input_filepath; + std::string output_filepath; + cudf::io::column_encoding encoding; + cudf::io::compression_type compression; + std::optional page_stats; + + switch (argc) { + case 1: + input_filepath = "example.parquet"; + output_filepath = "output.parquet"; + encoding = get_encoding_type("DELTA_BINARY_PACKED"); + compression = get_compression_type("ZSTD"); + break; + case 6: page_stats = get_page_size_stats(argv[5]); [[fallthrough]]; + case 5: + input_filepath = argv[1]; + output_filepath = argv[2]; + encoding = get_encoding_type(argv[3]); + compression = get_compression_type(argv[4]); + break; + default: + throw std::runtime_error( + "Either provide all command-line arguments, or none to use defaults\n"); + } + + // Create and use a memory pool + bool is_pool_used = true; + auto resource = create_memory_resource(is_pool_used); + rmm::mr::set_current_device_resource(resource.get()); + + // Read input parquet file + // We do not want to time the initial read time as it may include + // time for nvcomp, cufile loading and RMM growth + std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl; + std::cout << "Note: Not timing the initial parquet read as it may include\n" + "times for nvcomp, cufile loading and RMM growth." + << std::endl + << std::endl; + auto [input, metadata] = read_parquet(input_filepath); + + // Status string to indicate if page stats are set to be written or not + auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats"; + // Write parquet file with the specified encoding and compression + std::cout << "Writing " << output_filepath << " with encoding, compression and " + << page_stat_string << ".." << std::endl; + + // `timer` is automatically started here + Timer timer; + write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats); + timer.print_elapsed_millis(); + + // Read the parquet file written with encoding and compression + std::cout << "Reading " << output_filepath << "..." << std::endl; + + // Reset the timer + timer.reset(); + auto [transcoded_input, transcoded_metadata] = read_parquet(output_filepath); + timer.print_elapsed_millis(); + + // Check for validity + try { + // Left anti-join the original and transcoded tables + // identical tables should not throw an exception and + // return an empty indices vector + auto const indices = cudf::left_anti_join( + input->view(), transcoded_input->view(), cudf::null_equality::EQUAL, resource.get()); + + // No exception thrown, check indices + auto const valid = indices->size() == 0; + std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl; + } catch (std::exception& e) { + std::cerr << e.what() << std::endl << std::endl; + std::cout << "Transcoding valid: false" << std::endl; + } + + return 0; +} diff --git a/cpp/examples/parquet_io/parquet_io.hpp b/cpp/examples/parquet_io/parquet_io.hpp new file mode 100644 index 00000000000..d2fc359a2fe --- /dev/null +++ b/cpp/examples/parquet_io/parquet_io.hpp @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/** + * @brief Create memory resource for libcudf functions + * + * @param pool Whether to use a pool memory resource. + * @return Memory resource instance + */ +std::shared_ptr create_memory_resource(bool is_pool_used) +{ + auto cuda_mr = std::make_shared(); + if (is_pool_used) { + return rmm::mr::make_owning_wrapper( + cuda_mr, rmm::percent_of_free_device_memory(50)); + } + return cuda_mr; +} + +/** + * @brief Get encoding type from the keyword + * + * @param name encoding keyword name + * @return corresponding column encoding type + */ +[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name) +{ + using encoding_type = cudf::io::column_encoding; + + static const std::unordered_map map = { + {"DEFAULT", encoding_type::USE_DEFAULT}, + {"DICTIONARY", encoding_type::DICTIONARY}, + {"PLAIN", encoding_type::PLAIN}, + {"DELTA_BINARY_PACKED", encoding_type::DELTA_BINARY_PACKED}, + {"DELTA_LENGTH_BYTE_ARRAY", encoding_type::DELTA_LENGTH_BYTE_ARRAY}, + {"DELTA_BYTE_ARRAY", encoding_type::DELTA_BYTE_ARRAY}, + }; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { return map.at(name); } + throw std::invalid_argument("FATAL: " + std::string(name) + + " is not a valid encoding type.\n\n" + "Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n" + "DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n" + "DELTA_BYTE_ARRAY\n" + "\n" + "Exiting...\n"); +} + +/** + * @brief Get compression type from the keyword + * + * @param name compression keyword name + * @return corresponding compression type + */ +[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name) +{ + using compression_type = cudf::io::compression_type; + + static const std::unordered_map map = { + {"NONE", compression_type::NONE}, + {"AUTO", compression_type::AUTO}, + {"SNAPPY", compression_type::SNAPPY}, + {"LZ4", compression_type::LZ4}, + {"ZSTD", compression_type::ZSTD}}; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { return map.at(name); } + throw std::invalid_argument("FATAL: " + std::string(name) + + " is not a valid compression type.\n\n" + "Available compression_type types: NONE, AUTO, SNAPPY,\n" + "LZ4, ZSTD\n" + "\n" + "Exiting...\n"); +} + +/** + * @brief Get the optional page size stat frequency from they keyword + * + * @param use_stats keyword affirmation string such as: Y, T, YES, TRUE, ON + * @return optional page statistics frequency set to full (STATISTICS_COLUMN) + */ +[[nodiscard]] std::optional get_page_size_stats(std::string use_stats) +{ + std::transform(use_stats.begin(), use_stats.end(), use_stats.begin(), ::toupper); + + // Check if the input string matches to any of the following + if (not use_stats.compare("ON") or not use_stats.compare("TRUE") or + not use_stats.compare("YES") or not use_stats.compare("Y") or not use_stats.compare("T")) { + // Full column and offset indices - STATISTICS_COLUMN + return std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN); + } + + return std::nullopt; +} + +/** + * @brief Light-weight timer for parquet reader and writer instrumentation + * + * Timer object constructed from std::chrono, instrumenting at microseconds + * precision. Can display elapsed durations at milli and micro second + * scales. Timer starts at object construction. + */ +class Timer { + public: + using micros = std::chrono::microseconds; + using millis = std::chrono::milliseconds; + + Timer() { reset(); } + void reset() { start_time = std::chrono::high_resolution_clock::now(); } + auto elapsed() { return (std::chrono::high_resolution_clock::now() - start_time); } + void print_elapsed_micros() + { + std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() + << "us\n\n"; + } + void print_elapsed_millis() + { + std::cout << "Elapsed Time: " << std::chrono::duration_cast(elapsed()).count() + << "ms\n\n"; + } + + private: + using time_point_t = std::chrono::time_point; + time_point_t start_time; +};