Skip to content

Commit

Permalink
Adding parquet transcoding example (#15420)
Browse files Browse the repository at this point in the history
This PR adds a new example `parquet_io` to `libcudf/cpp/examples` instrumenting reading and writing parquet files with different column encodings (same for all columns for now) and compressions to close #15344. The example maybe elaborated and/or evolved as needed. #15348 should be merged before this PR to get all CMake updates needed to successfully build and run this example.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Ray Douglass (https://github.com/raydouglass)

URL: #15420
  • Loading branch information
mhaseeb123 authored May 13, 2024
1 parent 425a5da commit bff3015
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ci/run_cudf_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ compute-sanitizer --tool memcheck custom_optimized names.csv
compute-sanitizer --tool memcheck custom_prealloc names.csv
compute-sanitizer --tool memcheck custom_with_malloc names.csv

compute-sanitizer --tool memcheck parquet_io
compute-sanitizer --tool memcheck parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD TRUE

exit ${EXITCODE}
1 change: 1 addition & 0 deletions cpp/examples/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ build_example() {
build_example basic
build_example strings
build_example nested_types
build_example parquet_io
25 changes: 25 additions & 0 deletions cpp/examples/parquet_io/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

cmake_minimum_required(VERSION 3.26.4)

include(../set_cuda_architecture.cmake)

# initialize cuda architecture
rapids_cuda_init_architectures(parquet_io)
rapids_cuda_set_architectures(RAPIDS)

project(
parquet_io
VERSION 0.0.1
LANGUAGES CXX CUDA
)

include(../fetch_dependencies.cmake)

# Configure your project here
add_executable(parquet_io parquet_io.cpp)
target_link_libraries(parquet_io PRIVATE cudf::cudf)
target_compile_features(parquet_io PRIVATE cxx_std_17)

install(TARGETS parquet_io DESTINATION bin/examples/libcudf)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf)
Binary file added cpp/examples/parquet_io/example.parquet
Binary file not shown.
172 changes: 172 additions & 0 deletions cpp/examples/parquet_io/parquet_io.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "parquet_io.hpp"

/**
* @file parquet_io.cpp
* @brief Demonstrates usage of the libcudf APIs to read and write
* parquet file format with different encodings and compression types
*
* The following encoding and compression ztypes are demonstrated:
* Encoding Types: DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED,
* DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY
*
* Compression Types: NONE, AUTO, SNAPPY, LZ4, ZSTD
*
*/

/**
* @brief Read parquet input from file
*
* @param filepath path to input parquet file
* @return cudf::io::table_with_metadata
*/
cudf::io::table_with_metadata read_parquet(std::string filepath)
{
auto source_info = cudf::io::source_info(filepath);
auto builder = cudf::io::parquet_reader_options::builder(source_info);
auto options = builder.build();
return cudf::io::read_parquet(options);
}

/**
* @brief Write parquet output to file
*
* @param input table to write
* @param metadata metadata of input table read by parquet reader
* @param filepath path to output parquet file
* @param stats_level optional page size stats level
*/
void write_parquet(cudf::table_view input,
cudf::io::table_metadata metadata,
std::string filepath,
cudf::io::column_encoding encoding,
cudf::io::compression_type compression,
std::optional<cudf::io::statistics_freq> stats_level)
{
// write the data for inspection
auto sink_info = cudf::io::sink_info(filepath);
auto builder = cudf::io::parquet_writer_options::builder(sink_info, input);
auto table_metadata = cudf::io::table_input_metadata{metadata};

std::for_each(table_metadata.column_metadata.begin(),
table_metadata.column_metadata.end(),
[=](auto& col_meta) { col_meta.set_encoding(encoding); });

builder.metadata(table_metadata);
auto options = builder.build();
options.set_compression(compression);
// Either use the input stats level or don't write stats
options.set_stats_level(stats_level.value_or(cudf::io::statistics_freq::STATISTICS_NONE));

// write parquet data
cudf::io::write_parquet(options);
}

/**
* @brief Main for nested_types examples
*
* Command line parameters:
* 1. parquet input file name/path (default: "example.parquet")
* 2. parquet output file name/path (default: "output.parquet")
* 3. encoding type for columns (default: "DELTA_BINARY_PACKED")
* 4. compression type (default: "ZSTD")
* 5. optional: use page size stats metadata (default: "NO")
*
* Example invocation from directory `cudf/cpp/examples/parquet_io`:
* ./build/parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD
*
*/
int main(int argc, char const** argv)
{
std::string input_filepath;
std::string output_filepath;
cudf::io::column_encoding encoding;
cudf::io::compression_type compression;
std::optional<cudf::io::statistics_freq> page_stats;

switch (argc) {
case 1:
input_filepath = "example.parquet";
output_filepath = "output.parquet";
encoding = get_encoding_type("DELTA_BINARY_PACKED");
compression = get_compression_type("ZSTD");
break;
case 6: page_stats = get_page_size_stats(argv[5]); [[fallthrough]];
case 5:
input_filepath = argv[1];
output_filepath = argv[2];
encoding = get_encoding_type(argv[3]);
compression = get_compression_type(argv[4]);
break;
default:
throw std::runtime_error(
"Either provide all command-line arguments, or none to use defaults\n");
}

// Create and use a memory pool
bool is_pool_used = true;
auto resource = create_memory_resource(is_pool_used);
rmm::mr::set_current_device_resource(resource.get());

// Read input parquet file
// We do not want to time the initial read time as it may include
// time for nvcomp, cufile loading and RMM growth
std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl;
std::cout << "Note: Not timing the initial parquet read as it may include\n"
"times for nvcomp, cufile loading and RMM growth."
<< std::endl
<< std::endl;
auto [input, metadata] = read_parquet(input_filepath);

// Status string to indicate if page stats are set to be written or not
auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats";
// Write parquet file with the specified encoding and compression
std::cout << "Writing " << output_filepath << " with encoding, compression and "
<< page_stat_string << ".." << std::endl;

// `timer` is automatically started here
Timer timer;
write_parquet(input->view(), metadata, output_filepath, encoding, compression, page_stats);
timer.print_elapsed_millis();

// Read the parquet file written with encoding and compression
std::cout << "Reading " << output_filepath << "..." << std::endl;

// Reset the timer
timer.reset();
auto [transcoded_input, transcoded_metadata] = read_parquet(output_filepath);
timer.print_elapsed_millis();

// Check for validity
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(
input->view(), transcoded_input->view(), cudf::null_equality::EQUAL, resource.get());

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl;
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
std::cout << "Transcoding valid: false" << std::endl;
}

return 0;
}
157 changes: 157 additions & 0 deletions cpp/examples/parquet_io/parquet_io.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp>
#include <cudf/join.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <chrono>
#include <iostream>
#include <optional>
#include <string>

/**
* @brief Create memory resource for libcudf functions
*
* @param pool Whether to use a pool memory resource.
* @return Memory resource instance
*/
std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used)
{
auto cuda_mr = std::make_shared<rmm::mr::cuda_memory_resource>();
if (is_pool_used) {
return rmm::mr::make_owning_wrapper<rmm::mr::pool_memory_resource>(
cuda_mr, rmm::percent_of_free_device_memory(50));
}
return cuda_mr;
}

/**
* @brief Get encoding type from the keyword
*
* @param name encoding keyword name
* @return corresponding column encoding type
*/
[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name)
{
using encoding_type = cudf::io::column_encoding;

static const std::unordered_map<std::string_view, cudf::io::column_encoding> map = {
{"DEFAULT", encoding_type::USE_DEFAULT},
{"DICTIONARY", encoding_type::DICTIONARY},
{"PLAIN", encoding_type::PLAIN},
{"DELTA_BINARY_PACKED", encoding_type::DELTA_BINARY_PACKED},
{"DELTA_LENGTH_BYTE_ARRAY", encoding_type::DELTA_LENGTH_BYTE_ARRAY},
{"DELTA_BYTE_ARRAY", encoding_type::DELTA_BYTE_ARRAY},
};

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
" is not a valid encoding type.\n\n"
"Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n"
"DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n"
"DELTA_BYTE_ARRAY\n"
"\n"
"Exiting...\n");
}

/**
* @brief Get compression type from the keyword
*
* @param name compression keyword name
* @return corresponding compression type
*/
[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name)
{
using compression_type = cudf::io::compression_type;

static const std::unordered_map<std::string_view, cudf::io::compression_type> map = {
{"NONE", compression_type::NONE},
{"AUTO", compression_type::AUTO},
{"SNAPPY", compression_type::SNAPPY},
{"LZ4", compression_type::LZ4},
{"ZSTD", compression_type::ZSTD}};

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
" is not a valid compression type.\n\n"
"Available compression_type types: NONE, AUTO, SNAPPY,\n"
"LZ4, ZSTD\n"
"\n"
"Exiting...\n");
}

/**
* @brief Get the optional page size stat frequency from they keyword
*
* @param use_stats keyword affirmation string such as: Y, T, YES, TRUE, ON
* @return optional page statistics frequency set to full (STATISTICS_COLUMN)
*/
[[nodiscard]] std::optional<cudf::io::statistics_freq> get_page_size_stats(std::string use_stats)
{
std::transform(use_stats.begin(), use_stats.end(), use_stats.begin(), ::toupper);

// Check if the input string matches to any of the following
if (not use_stats.compare("ON") or not use_stats.compare("TRUE") or
not use_stats.compare("YES") or not use_stats.compare("Y") or not use_stats.compare("T")) {
// Full column and offset indices - STATISTICS_COLUMN
return std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN);
}

return std::nullopt;
}

/**
* @brief Light-weight timer for parquet reader and writer instrumentation
*
* Timer object constructed from std::chrono, instrumenting at microseconds
* precision. Can display elapsed durations at milli and micro second
* scales. Timer starts at object construction.
*/
class Timer {
public:
using micros = std::chrono::microseconds;
using millis = std::chrono::milliseconds;

Timer() { reset(); }
void reset() { start_time = std::chrono::high_resolution_clock::now(); }
auto elapsed() { return (std::chrono::high_resolution_clock::now() - start_time); }
void print_elapsed_micros()
{
std::cout << "Elapsed Time: " << std::chrono::duration_cast<micros>(elapsed()).count()
<< "us\n\n";
}
void print_elapsed_millis()
{
std::cout << "Elapsed Time: " << std::chrono::duration_cast<millis>(elapsed()).count()
<< "ms\n\n";
}

private:
using time_point_t = std::chrono::time_point<std::chrono::high_resolution_clock>;
time_point_t start_time;
};

0 comments on commit bff3015

Please sign in to comment.