Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally return compression statistics from ORC and Parquet writers #13294

Merged
merged 25 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ add_library(
src/io/comp/nvcomp_adapter.cpp
src/io/comp/nvcomp_adapter.cu
src/io/comp/snap.cu
src/io/comp/statistics.cu
src/io/comp/uncomp.cpp
src/io/comp/unsnap.cu
src/io/csv/csv_gpu.cu
Expand Down
70 changes: 70 additions & 0 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ class orc_writer_options {
const table_input_metadata* _metadata = nullptr;
// Optional footer key_value_metadata
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;

friend orc_writer_options_builder;

Expand Down Expand Up @@ -560,6 +562,16 @@ class orc_writer_options {
return _user_data;
}

/**
* @brief Returns a shared pointer to the user-provided compression statistics.
*
* @return Compression statistics
*/
[[nodiscard]] std::shared_ptr<writer_compression_statistics> get_compression_statistics() const
{
return _compression_stats;
}

// Setters

/**
Expand Down Expand Up @@ -648,6 +660,16 @@ class orc_writer_options {
{
_user_data = std::move(metadata);
}

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be updated after writing
*/
void set_compression_statistics(std::shared_ptr<writer_compression_statistics> comp_stats)
{
_compression_stats = std::move(comp_stats);
}
};

/**
Expand Down Expand Up @@ -775,6 +797,19 @@ class orc_writer_options_builder {
return *this;
}

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be filled once writer is done
* @return this for chaining
*/
orc_writer_options_builder& compression_statistics(
std::shared_ptr<writer_compression_statistics> const& comp_stats)
{
options._compression_stats = comp_stats;
return *this;
}

/**
* @brief move orc_writer_options member once it's built.
*/
Expand Down Expand Up @@ -829,6 +864,8 @@ class chunked_orc_writer_options {
const table_input_metadata* _metadata = nullptr;
// Optional footer key_value_metadata
std::map<std::string, std::string> _user_data;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;

friend chunked_orc_writer_options_builder;

Expand Down Expand Up @@ -919,6 +956,16 @@ class chunked_orc_writer_options {
return _user_data;
}

/**
* @brief Returns a shared pointer to the user-provided compression statistics.
*
* @return Compression statistics
*/
[[nodiscard]] std::shared_ptr<writer_compression_statistics> get_compression_statistics() const
{
return _compression_stats;
}

// Setters

/**
Expand Down Expand Up @@ -1000,6 +1047,16 @@ class chunked_orc_writer_options {
{
_user_data = std::move(metadata);
}

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be updated after writing
*/
void set_compression_statistics(std::shared_ptr<writer_compression_statistics> comp_stats)
{
_compression_stats = std::move(comp_stats);
}
};

/**
Expand Down Expand Up @@ -1113,6 +1170,19 @@ class chunked_orc_writer_options_builder {
return *this;
}

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be filled once writer is done
* @return this for chaining
*/
chunked_orc_writer_options_builder& compression_statistics(
std::shared_ptr<writer_compression_statistics> const& comp_stats)
{
options._compression_stats = comp_stats;
return *this;
}

/**
* @brief move chunked_orc_writer_options member once it's built.
*/
Expand Down
70 changes: 70 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ class parquet_writer_options {
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;

/**
* @brief Constructor from sink and table.
Expand Down Expand Up @@ -670,6 +672,16 @@ class parquet_writer_options {
*/
[[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; }

/**
* @brief Returns a shared pointer to the user-provided compression statistics.
*
* @return Compression statistics
*/
[[nodiscard]] std::shared_ptr<writer_compression_statistics> get_compression_statistics() const
{
return _compression_stats;
}

/**
* @brief Sets partitions.
*
Expand Down Expand Up @@ -777,6 +789,16 @@ class parquet_writer_options {
* @param size_rows Maximum page fragment size, in rows.
*/
void set_max_page_fragment_size(size_type size_rows);

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be updated after writing
*/
void set_compression_statistics(std::shared_ptr<writer_compression_statistics> comp_stats)
{
_compression_stats = std::move(comp_stats);
}
};

/**
Expand Down Expand Up @@ -983,6 +1005,19 @@ class parquet_writer_options_builder {
*/
parquet_writer_options_builder& max_page_fragment_size(size_type val);

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be filled once writer is done
* @return this for chaining
*/
parquet_writer_options_builder& compression_statistics(
std::shared_ptr<writer_compression_statistics> const& comp_stats)
{
options._compression_stats = comp_stats;
return *this;
}

/**
* @brief Sets whether int96 timestamps are written or not in parquet_writer_options.
*
Expand Down Expand Up @@ -1074,6 +1109,8 @@ class chunked_parquet_writer_options {
size_t _max_dictionary_size = default_max_dictionary_size;
// Maximum number of rows in a page fragment
std::optional<size_type> _max_page_fragment_size;
// Optional compression statistics
std::shared_ptr<writer_compression_statistics> _compression_stats;

/**
* @brief Constructor from sink.
Expand Down Expand Up @@ -1204,6 +1241,16 @@ class chunked_parquet_writer_options {
*/
[[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; }

/**
* @brief Returns a shared pointer to the user-provided compression statistics.
*
* @return Compression statistics
*/
[[nodiscard]] std::shared_ptr<writer_compression_statistics> get_compression_statistics() const
{
return _compression_stats;
}

/**
* @brief Sets metadata.
*
Expand Down Expand Up @@ -1297,6 +1344,16 @@ class chunked_parquet_writer_options {
*/
void set_max_page_fragment_size(size_type size_rows);

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be updated after writing
*/
void set_compression_statistics(std::shared_ptr<writer_compression_statistics> comp_stats)
{
_compression_stats = std::move(comp_stats);
}

/**
* @brief creates builder to build chunked_parquet_writer_options.
*
Expand Down Expand Up @@ -1503,6 +1560,19 @@ class chunked_parquet_writer_options_builder {
*/
chunked_parquet_writer_options_builder& max_page_fragment_size(size_type val);

/**
* @brief Sets the pointer to the output compression statistics.
*
* @param comp_stats Pointer to compression statistics to be filled once writer is done
* @return this for chaining
*/
chunked_parquet_writer_options_builder& compression_statistics(
std::shared_ptr<writer_compression_statistics> const& comp_stats)
{
options._compression_stats = comp_stats;
return *this;
}

/**
* @brief move chunked_parquet_writer_options member once it's built.
*/
Expand Down
98 changes: 98 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,104 @@ enum statistics_freq {
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
* @brief Statistics about compression performed by a writer.
*/
class writer_compression_statistics {
public:
/**
* @brief Default constructor
*/
writer_compression_statistics() = default;

/**
* @brief Constructor with initial values.
*
* @param num_compressed_bytes The number of bytes that were successfully compressed
* @param num_failed_bytes The number of bytes that failed to compress
* @param num_skipped_bytes The number of bytes that were skipped during compression
* @param num_compressed_output_bytes The number of bytes in the compressed output
*/
writer_compression_statistics(size_t num_compressed_bytes,
size_t num_failed_bytes,
size_t num_skipped_bytes,
size_t num_compressed_output_bytes)
: _num_compressed_bytes(num_compressed_bytes),
_num_failed_bytes(num_failed_bytes),
_num_skipped_bytes(num_skipped_bytes),
_num_compressed_output_bytes(num_compressed_output_bytes)
{
}

/**
* @brief Adds the values from another `writer_compression_statistics` object.
*
* @param other The other writer_compression_statistics object
* @return writer_compression_statistics& Reference to this object
*/
writer_compression_statistics& operator+=(const writer_compression_statistics& other) noexcept
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
{
_num_compressed_bytes += other._num_compressed_bytes;
_num_failed_bytes += other._num_failed_bytes;
_num_skipped_bytes += other._num_skipped_bytes;
_num_compressed_output_bytes += other._num_compressed_output_bytes;
return *this;
}

/**
* @brief Returns the number of bytes in blocks that were successfully compressed.
*
* This is the number of bytes that were actually compressed, not the size of the compressed
* output.
*
* @return size_t The number of bytes that were successfully compressed
*/
[[nodiscard]] auto num_compressed_bytes() const noexcept { return _num_compressed_bytes; }

/**
* @brief Returns the number of bytes in blocks that failed to compress.
*
* @return size_t The number of bytes that failed to compress
*/
[[nodiscard]] auto num_failed_bytes() const noexcept { return _num_failed_bytes; }

/**
* @brief Returns the number of bytes in blocks that were skipped during compression.
*
* @return size_t The number of bytes that were skipped during compression
*/
[[nodiscard]] auto num_skipped_bytes() const noexcept { return _num_skipped_bytes; }

/**
* @brief Returns the total size of compression inputs.
*
* @return size_t The total size of compression inputs
*/
[[nodiscard]] auto num_total_input_bytes() const noexcept
{
return num_compressed_bytes() + num_failed_bytes() + num_skipped_bytes();
}

/**
* @brief Returns the compression ratio for the successfully compressed blocks.
*
* Returns nan if there were no successfully compressed blocks.
*
* @return double The ratio between the size of the compression inputs and the size of the
* compressed output.
*/
[[nodiscard]] auto compression_ratio() const noexcept
{
return static_cast<double>(num_compressed_bytes()) / _num_compressed_output_bytes;
Copy link
Contributor

@ttnghia ttnghia May 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling for div by zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We let C++ handle it and return a nan. I added a note about this behavior in the docs of compression_ratio.
The alternative would be an optional, but I think nan fits better here.

}

private:
std::size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed
std::size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress
std::size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression
std::size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output
};

/**
* @brief Control use of dictionary encoding for parquet writer
*/
Expand Down
16 changes: 15 additions & 1 deletion cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -136,5 +137,18 @@ void gpu_snap(device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result> results,
rmm::cuda_stream_view stream);

/**
* @brief Aggregate results of compression into a single statistics object.
*
* @param inputs List of uncompressed input buffers
* @param results List of compression results
* @param stream CUDA stream to use
* @return writer_compression_statistics
*/
[[nodiscard]] writer_compression_statistics collect_compression_statistics(
device_span<device_span<uint8_t const> const> inputs,
device_span<compression_result const> results,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace cudf
Loading