From ab01a46434de1d76aa8f78ca2163a5df44a67760 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 3 May 2023 14:03:02 -0700 Subject: [PATCH 01/17] orc framework w/ stub data --- cpp/include/cudf/io/orc.hpp | 72 +++++++++++++++++++++++ cpp/include/cudf/io/types.hpp | 102 ++++++++++++++++++++++++++++++++- cpp/src/io/orc/writer_impl.cu | 41 ++++++++++--- cpp/src/io/orc/writer_impl.hpp | 15 ++++- 4 files changed, 216 insertions(+), 14 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index e3abbe6056f..e74beef526b 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -448,6 +448,8 @@ class orc_writer_options { const table_input_metadata* _metadata = nullptr; // Optional footer key_value_metadata std::map _user_data; + // Optional compression statistics + std::shared_ptr _compression_stats; friend orc_writer_options_builder; @@ -560,6 +562,17 @@ class orc_writer_options { return _user_data; } + /** + * @brief Returns compression statistics. + * + * @return Compression statistics + */ + [[nodiscard]] std::shared_ptr const& get_compression_statistics() + const + { + return _compression_stats; + } + // Setters /** @@ -648,6 +661,16 @@ class orc_writer_options { { _user_data = std::move(metadata); } + + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be updated after writing + */ + void set_compression_statistics(std::shared_ptr const& comp_stats) + { + _compression_stats = comp_stats; + } }; /** @@ -775,6 +798,19 @@ class orc_writer_options_builder { return *this; } + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be filled once writer is done + * @return this for chaining + */ + orc_writer_options_builder& compression_statistics( + std::shared_ptr const& comp_stats) + { + options._compression_stats = comp_stats; + return *this; + } + /** * @brief move orc_writer_options member once it's built. */ @@ -829,6 +865,8 @@ class chunked_orc_writer_options { const table_input_metadata* _metadata = nullptr; // Optional footer key_value_metadata std::map _user_data; + // Optional compression statistics + std::shared_ptr _compression_stats; friend chunked_orc_writer_options_builder; @@ -919,6 +957,17 @@ class chunked_orc_writer_options { return _user_data; } + /** + * @brief Returns compression statistics. + * + * @return Compression statistics + */ + [[nodiscard]] std::shared_ptr const& get_compression_statistics() + const + { + return _compression_stats; + } + // Setters /** @@ -1000,6 +1049,16 @@ class chunked_orc_writer_options { { _user_data = std::move(metadata); } + + /** + * @brief Sets compression statistics. + * + * @param comp_stats Pointer to compression statistics to be updated after writing each table + */ + void set_compression_statistics(std::shared_ptr const& comp_stats) + { + _compression_stats = comp_stats; + } }; /** @@ -1113,6 +1172,19 @@ class chunked_orc_writer_options_builder { return *this; } + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be filled once writer is done + * @return this for chaining + */ + chunked_orc_writer_options_builder& compression_statistics( + std::shared_ptr const& comp_stats) + { + options._compression_stats = comp_stats; + return *this; + } + /** * @brief move chunked_orc_writer_options member once it's built. */ diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 7426811a18d..fab5346a9ea 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -100,6 +100,101 @@ enum statistics_freq { STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP }; +/** + * @brief Statistics about compression performed by a writer. + */ +class writer_compression_statistics { + size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed + size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress + size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression + size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output + + public: + /** + * @brief Default constructor + */ + writer_compression_statistics() = default; + + /** + * @brief Constructor with initial values. + * + * @param num_compressed_bytes The number of bytes that were successfully compressed + * @param num_failed_bytes The number of bytes that failed to compress + * @param num_skipped_bytes The number of bytes that were skipped during compression + * @param num_compressed_output_bytes The number of bytes in the compressed output + */ + writer_compression_statistics(size_t num_compressed_bytes, + size_t num_failed_bytes, + size_t num_skipped_bytes, + size_t num_compressed_output_bytes) + : _num_compressed_bytes(num_compressed_bytes), + _num_failed_bytes(num_failed_bytes), + _num_skipped_bytes(num_skipped_bytes), + _num_compressed_output_bytes(num_compressed_output_bytes) + { + } + + /** + * @brief Adds the values from another `writer_compression_statistics` object. + * + * @param other The other writer_compression_statistics object + * @return writer_compression_statistics& Reference to this object + */ + writer_compression_statistics& operator+=(const writer_compression_statistics& other) noexcept + { + _num_compressed_bytes += other._num_compressed_bytes; + _num_failed_bytes += other._num_failed_bytes; + _num_skipped_bytes += other._num_skipped_bytes; + _num_compressed_output_bytes += other._num_compressed_output_bytes; + return *this; + } + + /** + * @brief Returns the number of bytes in blocks that were successfully compressed. + * + * This is the number of bytes that were actually compressed, not the size of the compressed + * output. + * + * @return size_t The number of bytes that were successfully compressed + */ + [[nodiscard]] auto num_compressed_bytes() const noexcept { return _num_compressed_bytes; } + + /** + * @brief Returns the number of bytes in blocks that failed to compress. + * + * @return size_t The number of bytes that failed to compress + */ + [[nodiscard]] auto num_failed_bytes() const noexcept { return _num_failed_bytes; } + + /** + * @brief Returns the number of bytes in blocks that were skipped during compression. + * + * @return size_t The number of bytes that were skipped during compression + */ + [[nodiscard]] auto num_skipped_bytes() const noexcept { return _num_skipped_bytes; } + + /** + * @brief Returns the total size of compression inputs. + * + * @return size_t The total size of compression inputs + */ + [[nodiscard]] auto num_total_input_bytes() const noexcept + { + return num_compressed_bytes() + num_failed_bytes() + num_skipped_bytes(); + } + + /** + * @brief Returns the compression ratio for the successfully compressed blocks. + * + * @return double The ratio between the size of the compression inputs and the size of the + * compressed output. + */ + [[nodiscard]] auto compression_ratio() const noexcept + { + return static_cast(num_compressed_bytes()) / _num_compressed_output_bytes; + } +}; + /** * @brief Control use of dictionary encoding for parquet writer */ @@ -169,8 +264,8 @@ struct host_buffer { }; /** - * @brief Returns `true` if the type is byte-like, meaning it is reasonable to pass as a pointer to - * bytes. + * @brief Returns `true` if the type is byte-like, meaning it is reasonable to pass as a pointer + * to bytes. * * @tparam T The representation type * @return `true` if the type is considered a byte-like type @@ -634,7 +729,8 @@ class column_in_metadata { [[nodiscard]] bool nullable() const { return _nullable.value(); } /** - * @brief If this is the metadata of a list column, returns whether it is to be encoded as a map. + * @brief If this is the metadata of a list column, returns whether it is to be encoded as a + * map. * * @return Boolean indicating whether this column is to be encoded as a map */ diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index fef1bb23733..57bcf2446fc 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1543,7 +1543,7 @@ void write_index_stream(int32_t stripe_id, host_2dspan enc_streams, host_2dspan strm_desc, host_span comp_res, - std::vector const& rg_stats, + host_span rg_stats, StripeInformation* stripe, orc_streams* streams, CompressionKind compression_kind, @@ -2187,6 +2187,7 @@ std::unique_ptr make_table_meta(table_view const& input) * @param compression_kind The compression kind * @param compression_blocksize The block size used for compression * @param stats_freq Column statistics granularity type for parquet/orc writers + * @param collect_compression_stats Flag to indicate if compression statistics should be collected * @param write_mode Flag to indicate if there is only a single table write * @param out_sink Sink for writing data * @param stream CUDA stream used for device memory operations and kernel launches @@ -2200,6 +2201,7 @@ auto convert_table_to_orc_data(table_view const& input, CompressionKind compression_kind, size_t compression_blocksize, statistics_freq stats_freq, + bool collect_compression_stats, single_write_mode write_mode, data_sink const& out_sink, rmm::cuda_stream_view stream) @@ -2280,6 +2282,7 @@ auto convert_table_to_orc_data(table_view const& input, hostdevice_vector{}, // comp_results std::move(strm_descs), intermediate_statistics{stream}, + writer_compression_statistics{}, std::move(streams), std::move(stripes), std::move(stripe_dict), @@ -2324,6 +2327,7 @@ auto convert_table_to_orc_data(table_view const& input, // Compress the data streams rmm::device_uvector compressed_data(compressed_bfr_size, stream); hostdevice_vector comp_results(num_compressed_blocks, stream); + writer_compression_statistics compression_stats; thrust::fill(rmm::exec_policy(stream), comp_results.d_begin(), comp_results.d_end(), @@ -2346,6 +2350,11 @@ auto convert_table_to_orc_data(table_view const& input, strm_descs.device_to_host(stream); comp_results.device_to_host(stream, true); + + // TODO populate compression statistics + if (collect_compression_stats) { + compression_stats = writer_compression_statistics{1, 2, 3, 4}; + } } auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream); @@ -2357,6 +2366,7 @@ auto convert_table_to_orc_data(table_view const& input, std::move(comp_results), std::move(strm_descs), std::move(intermediate_stats), + std::move(compression_stats), std::move(streams), std::move(stripes), std::move(stripe_dict), @@ -2374,6 +2384,7 @@ writer::impl::impl(std::unique_ptr sink, _row_index_stride{options.get_row_index_stride()}, _compression_kind(to_orc_compression(options.get_compression())), _compression_blocksize(compression_block_size(_compression_kind)), + _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), @@ -2394,6 +2405,7 @@ writer::impl::impl(std::unique_ptr sink, _row_index_stride{options.get_row_index_stride()}, _compression_kind(to_orc_compression(options.get_compression())), _compression_blocksize(compression_block_size(_compression_kind)), + _compression_statistics(options.get_compression_statistics()), _stats_freq(options.get_statistics_freq()), _single_write_mode(mode), _kv_meta(options.get_key_value_metadata()), @@ -2431,6 +2443,7 @@ void writer::impl::write(table_view const& input) comp_results, strm_descs, intermediate_stats, + compression_stats, streams, stripes, stripe_dict, /* unused, but its data will be accessed via pointer later */ @@ -2444,6 +2457,7 @@ void writer::impl::write(table_view const& input) _compression_kind, _compression_blocksize, _stats_freq, + _compression_statistics != nullptr, _single_write_mode, *_out_sink, _stream); @@ -2462,13 +2476,27 @@ void writer::impl::write(table_view const& input) compressed_data, comp_results, strm_descs, - intermediate_stats, + intermediate_stats.rowgroup_blobs, streams, stripes, bounce_buffer); // Update data into the footer. This needs to be called even when num_rows==0. add_table_to_footer_data(orc_table, stripes); + + // Update file-level and compression statistics + update_statistics(orc_table.num_rows(), intermediate_stats, compression_stats); +} + +void writer::impl::update_statistics(size_type num_rows, + intermediate_statistics& intermediate_stats, // why not const? + writer_compression_statistics const& compression_stats) +{ + if (intermediate_stats.stripe_stat_chunks.size() > 0) { + _persisted_stripe_statistics.persist(num_rows, _single_write_mode, intermediate_stats, _stream); + } + + if (_compression_statistics != nullptr) { *_compression_statistics += compression_stats; } } void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, @@ -2477,18 +2505,13 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, device_span compressed_data, host_span comp_results, host_2dspan strm_descs, - intermediate_statistics& intermediate_stats, + host_span rg_stats, orc_streams& streams, host_span stripes, host_span bounce_buffer) { if (orc_table.num_rows() == 0) { return; } - if (intermediate_stats.stripe_stat_chunks.size() > 0) { - _persisted_stripe_statistics.persist( - orc_table.num_rows(), _single_write_mode, intermediate_stats, _stream); - } - // Write stripes std::vector> write_tasks; for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { @@ -2506,7 +2529,7 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, enc_data.streams, strm_descs, comp_results, - intermediate_stats.rowgroup_blobs, + rg_stats, &stripe, &streams, _compression_kind, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index cdcc092549a..4a794059a38 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -299,7 +299,7 @@ class writer::impl { * @param[in] compressed_data Compressed stream data * @param[in] comp_results Status of data compression * @param[in] strm_descs List of stream descriptors - * @param[in,out] intermediate_stats Statistics data stored between calls to write + * @param[in] rg_stats row group level statistics * @param[in,out] streams List of stream descriptors * @param[in,out] stripes List of stripe description * @param[out] bounce_buffer Temporary host output buffer @@ -310,7 +310,7 @@ class writer::impl { device_span compressed_data, host_span comp_results, host_2dspan strm_descs, - intermediate_statistics& intermediate_stats, + host_span rg_stats, orc_streams& streams, host_span stripes, host_span bounce_buffer); @@ -324,6 +324,16 @@ class writer::impl { void add_table_to_footer_data(orc_table_view const& orc_table, std::vector& stripes); + /** + * @brief Update writer-level statistics with data from the current table. + * + * @param num_rows Number of rows in the current table. + * @param single_table_stats Statistics data from the current table. + */ + void update_statistics(size_type num_rows, + intermediate_statistics& single_table_stats, + writer_compression_statistics const& compression_stats); + private: // CUDA stream. rmm::cuda_stream_view const _stream; @@ -333,6 +343,7 @@ class writer::impl { size_type const _row_index_stride; CompressionKind const _compression_kind; size_t const _compression_blocksize; + std::shared_ptr _compression_statistics; statistics_freq const _stats_freq; single_write_mode const _single_write_mode; // Special parameter only used by `write()` to // indicate that we are guaranteeing a single table From e48e965bfaff915b5f4b320eafe8f0fa67ee7303 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 3 May 2023 15:02:11 -0700 Subject: [PATCH 02/17] move stub to gpuinflate --- cpp/src/io/comp/gpuinflate.hpp | 10 ++++++++- cpp/src/io/orc/orc_gpu.hpp | 36 ++++++++++++++++------------- cpp/src/io/orc/stripe_enc.cu | 28 ++++++++++++++--------- cpp/src/io/orc/writer_impl.cu | 41 +++++++++++++++++----------------- cpp/src/io/orc/writer_impl.hpp | 2 +- 5 files changed, 69 insertions(+), 48 deletions(-) diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 1b45a31b13b..9526063cb12 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ #pragma once +#include #include #include @@ -136,5 +137,12 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); +inline writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results) +{ + return writer_compression_statistics{3, 3, 2, 2}; +} + } // namespace io } // namespace cudf diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 353affac6f2..6059d6adf16 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -16,17 +16,18 @@ #pragma once -#include - #include "orc.hpp" +#include +#include +#include + +#include #include +#include #include #include #include -#include -#include -#include #include @@ -350,21 +351,26 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression * @param[in] comp_block_align Required alignment for compressed blocks + * @param[in] collect_statistics Whether to collect compression statistics * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[out] comp_res Per-block compression status * @param[in] stream CUDA stream used for device memory operations and kernel launches + * + * @return Compression statistics (if requested) */ -void CompressOrcDataStreams(uint8_t* compressed_data, - uint32_t num_compressed_blocks, - CompressionKind compression, - uint32_t comp_blk_size, - uint32_t max_comp_blk_size, - uint32_t comp_block_align, - device_2dspan strm_desc, - device_2dspan enc_streams, - device_span comp_res, - rmm::cuda_stream_view stream); +std::optional CompressOrcDataStreams( + uint8_t* compressed_data, + uint32_t num_compressed_blocks, + CompressionKind compression, + uint32_t comp_blk_size, + uint32_t max_comp_blk_size, + uint32_t comp_block_align, + bool collect_statistics, + device_2dspan strm_desc, + device_2dspan enc_streams, + device_span comp_res, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing dictionary chunks diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index b52075e4c28..1a09ad9d3fa 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1291,16 +1291,18 @@ void CompactOrcDataStreams(device_2dspan strm_desc, gpuCompactOrcDataStreams<<>>(strm_desc, enc_streams); } -void CompressOrcDataStreams(uint8_t* compressed_data, - uint32_t num_compressed_blocks, - CompressionKind compression, - uint32_t comp_blk_size, - uint32_t max_comp_blk_size, - uint32_t comp_block_align, - device_2dspan strm_desc, - device_2dspan enc_streams, - device_span comp_res, - rmm::cuda_stream_view stream) +std::optional CompressOrcDataStreams( + uint8_t* compressed_data, + uint32_t num_compressed_blocks, + CompressionKind compression, + uint32_t comp_blk_size, + uint32_t max_comp_blk_size, + uint32_t comp_block_align, + bool collect_statistics, + device_2dspan strm_desc, + device_2dspan enc_streams, + device_span comp_res, + rmm::cuda_stream_view stream) { rmm::device_uvector> comp_in(num_compressed_blocks, stream); rmm::device_uvector> comp_out(num_compressed_blocks, stream); @@ -1356,6 +1358,12 @@ void CompressOrcDataStreams(uint8_t* compressed_data, dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); + + if (collect_statistics) { + return cudf::io::collect_compression_statistics(comp_in, comp_res); + } else { + return std::nullopt; + } } } // namespace gpu diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 57bcf2446fc..40154ac760a 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2282,7 +2282,7 @@ auto convert_table_to_orc_data(table_view const& input, hostdevice_vector{}, // comp_results std::move(strm_descs), intermediate_statistics{stream}, - writer_compression_statistics{}, + std::optional{}, std::move(streams), std::move(stripes), std::move(stripe_dict), @@ -2327,34 +2327,30 @@ auto convert_table_to_orc_data(table_view const& input, // Compress the data streams rmm::device_uvector compressed_data(compressed_bfr_size, stream); hostdevice_vector comp_results(num_compressed_blocks, stream); - writer_compression_statistics compression_stats; + std::optional compression_stats; thrust::fill(rmm::exec_policy(stream), comp_results.d_begin(), comp_results.d_end(), compression_result{0, compression_status::FAILURE}); if (compression_kind != NONE) { strm_descs.host_to_device(stream); - gpu::CompressOrcDataStreams(compressed_data.data(), - num_compressed_blocks, - compression_kind, - compression_blocksize, - max_compressed_block_size, - compressed_block_align, - strm_descs, - enc_data.streams, - comp_results, - stream); + compression_stats = gpu::CompressOrcDataStreams(compressed_data.data(), + num_compressed_blocks, + compression_kind, + compression_blocksize, + max_compressed_block_size, + compressed_block_align, + collect_compression_stats, + strm_descs, + enc_data.streams, + comp_results, + stream); // deallocate encoded data as it is not needed anymore enc_data.data.clear(); strm_descs.device_to_host(stream); comp_results.device_to_host(stream, true); - - // TODO populate compression statistics - if (collect_compression_stats) { - compression_stats = writer_compression_statistics{1, 2, 3, 4}; - } } auto intermediate_stats = gather_statistic_blobs(stats_freq, orc_table, segmentation, stream); @@ -2488,15 +2484,18 @@ void writer::impl::write(table_view const& input) update_statistics(orc_table.num_rows(), intermediate_stats, compression_stats); } -void writer::impl::update_statistics(size_type num_rows, - intermediate_statistics& intermediate_stats, // why not const? - writer_compression_statistics const& compression_stats) +void writer::impl::update_statistics( + size_type num_rows, + intermediate_statistics& intermediate_stats, // why not const? + std::optional const& compression_stats) { if (intermediate_stats.stripe_stat_chunks.size() > 0) { _persisted_stripe_statistics.persist(num_rows, _single_write_mode, intermediate_stats, _stream); } - if (_compression_statistics != nullptr) { *_compression_statistics += compression_stats; } + if (compression_stats.has_value() and _compression_statistics != nullptr) { + *_compression_statistics += compression_stats.value(); + } } void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 4a794059a38..d8729c89583 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -332,7 +332,7 @@ class writer::impl { */ void update_statistics(size_type num_rows, intermediate_statistics& single_table_stats, - writer_compression_statistics const& compression_stats); + std::optional const& compression_stats); private: // CUDA stream. From 056a2eda932ce3b020e1eff2820ff7ecc46a08c9 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 4 May 2023 12:01:33 -0700 Subject: [PATCH 03/17] actual implementation --- cpp/CMakeLists.txt | 1 + cpp/src/io/comp/gpuinflate.hpp | 16 ++++++--- cpp/src/io/comp/statistics.cu | 62 ++++++++++++++++++++++++++++++++++ cpp/src/io/orc/stripe_enc.cu | 2 +- 4 files changed, 75 insertions(+), 6 deletions(-) create mode 100644 cpp/src/io/comp/statistics.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 6d9986178d1..0516653bb65 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -346,6 +346,7 @@ add_library( src/io/comp/nvcomp_adapter.cpp src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu + src/io/comp/statistics.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu src/io/csv/csv_gpu.cu diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 9526063cb12..88e95707fec 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -137,12 +137,18 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -inline writer_compression_statistics collect_compression_statistics( +/** + * @brief Aggregate results of compression into a single statistics object. + * + * @param inputs List of uncompressed input buffers + * @param results List of compression results + * @param stream CUDA stream to use + * @return writer_compression_statistics + */ +[[nodiscard]] writer_compression_statistics collect_compression_statistics( device_span const> inputs, - device_span results) -{ - return writer_compression_statistics{3, 3, 2, 2}; -} + device_span results, + rmm::cuda_stream_view stream); } // namespace io } // namespace cudf diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu new file mode 100644 index 00000000000..60685085c41 --- /dev/null +++ b/cpp/src/io/comp/statistics.cu @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gpuinflate.hpp" + +#include +#include + +namespace cudf::io { + +writer_compression_statistics collect_compression_statistics( + device_span const> inputs, + device_span results, + rmm::cuda_stream_view stream) +{ + // bytes_written on success + auto const output_size_successful = thrust::transform_reduce( + rmm::exec_policy(stream), + results.begin(), + results.end(), + [] __device__(auto& res) { + return res.status == compression_status::SUCCESS ? res.bytes_written : 0; + }, + 0ul, + thrust::plus()); + + auto input_size_with_status = [inputs, results, stream](compression_status status) { + auto zipped_begin = + thrust::make_zip_iterator(thrust::make_tuple(inputs.begin(), results.begin())); + auto zipped_end = thrust::make_zip_iterator(thrust::make_tuple(inputs.end(), results.end())); + + return thrust::transform_reduce( + rmm::exec_policy(stream), + zipped_begin, + zipped_end, + [status] __device__(auto tup) { + return thrust::get<1>(tup).status == status ? thrust::get<0>(tup).size() : 0; + }, + 0ul, + thrust::plus()); + }; + + return writer_compression_statistics{input_size_with_status(compression_status::SUCCESS), + input_size_with_status(compression_status::FAILURE), + input_size_with_status(compression_status::SKIPPED), + output_size_successful}; +} + +} // namespace cudf::io diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 1a09ad9d3fa..7b818e66815 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1360,7 +1360,7 @@ std::optional CompressOrcDataStreams( strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); if (collect_statistics) { - return cudf::io::collect_compression_statistics(comp_in, comp_res); + return cudf::io::collect_compression_statistics(comp_in, comp_res, stream); } else { return std::nullopt; } From 245cae03cca8eaca40ab94d9987661ca524a13b0 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 4 May 2023 15:12:57 -0700 Subject: [PATCH 04/17] docs fix --- cpp/src/io/orc/writer_impl.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index d8729c89583..53e1d38264d 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -327,8 +327,9 @@ class writer::impl { /** * @brief Update writer-level statistics with data from the current table. * - * @param num_rows Number of rows in the current table. - * @param single_table_stats Statistics data from the current table. + * @param num_rows Number of rows in the current table + * @param single_table_stats Statistics data from the current table + * @param compression_stats Compression statistics from the current table */ void update_statistics(size_type num_rows, intermediate_statistics& single_table_stats, From d62a8ecf8015f639d39cc1a0ab70c450eb5dba05 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 4 May 2023 15:25:54 -0700 Subject: [PATCH 05/17] Parquet side --- cpp/include/cudf/io/parquet.hpp | 72 ++++++++++++++++++++++++++++++ cpp/src/io/parquet/writer_impl.cu | 30 ++++++++++++- cpp/src/io/parquet/writer_impl.hpp | 10 +++++ 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 07d41e3b132..796afe9d92d 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -496,6 +496,8 @@ class parquet_writer_options { size_t _max_dictionary_size = default_max_dictionary_size; // Maximum number of rows in a page fragment std::optional _max_page_fragment_size; + // Optional compression statistics + std::shared_ptr _compression_stats; /** * @brief Constructor from sink and table. @@ -670,6 +672,17 @@ class parquet_writer_options { */ [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } + /** + * @brief Returns compression statistics. + * + * @return Compression statistics + */ + [[nodiscard]] std::shared_ptr const& get_compression_statistics() + const + { + return _compression_stats; + } + /** * @brief Sets partitions. * @@ -777,6 +790,16 @@ class parquet_writer_options { * @param size_rows Maximum page fragment size, in rows. */ void set_max_page_fragment_size(size_type size_rows); + + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be updated after writing + */ + void set_compression_statistics(std::shared_ptr const& comp_stats) + { + _compression_stats = comp_stats; + } }; /** @@ -983,6 +1006,19 @@ class parquet_writer_options_builder { */ parquet_writer_options_builder& max_page_fragment_size(size_type val); + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be filled once writer is done + * @return this for chaining + */ + parquet_writer_options_builder& compression_statistics( + std::shared_ptr const& comp_stats) + { + options._compression_stats = comp_stats; + return *this; + } + /** * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. * @@ -1074,6 +1110,8 @@ class chunked_parquet_writer_options { size_t _max_dictionary_size = default_max_dictionary_size; // Maximum number of rows in a page fragment std::optional _max_page_fragment_size; + // Optional compression statistics + std::shared_ptr _compression_stats; /** * @brief Constructor from sink. @@ -1204,6 +1242,17 @@ class chunked_parquet_writer_options { */ [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } + /** + * @brief Returns compression statistics. + * + * @return Compression statistics + */ + [[nodiscard]] std::shared_ptr const& get_compression_statistics() + const + { + return _compression_stats; + } + /** * @brief Sets metadata. * @@ -1297,6 +1346,16 @@ class chunked_parquet_writer_options { */ void set_max_page_fragment_size(size_type size_rows); + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be updated after writing + */ + void set_compression_statistics(std::shared_ptr const& comp_stats) + { + _compression_stats = comp_stats; + } + /** * @brief creates builder to build chunked_parquet_writer_options. * @@ -1503,6 +1562,19 @@ class chunked_parquet_writer_options_builder { */ chunked_parquet_writer_options_builder& max_page_fragment_size(size_type val); + /** + * @brief Sets the pointer to the output compression statistics. + * + * @param comp_stats Pointer to compression statistics to be filled once writer is done + * @return this for chaining + */ + chunked_parquet_writer_options_builder& compression_statistics( + std::shared_ptr const& comp_stats) + { + options._compression_stats = comp_stats; + return *this; + } + /** * @brief move chunked_parquet_writer_options member once it's built. */ diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 898d470d7d1..05d42cd9e2f 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1255,6 +1255,7 @@ void init_encoder_pages(hostdevice_2dvector& chunks, * @param page_stats optional page-level statistics (nullptr if none) * @param chunk_stats optional chunk-level statistics (nullptr if none) * @param column_stats optional page-level statistics for column index (nullptr if none) + * @param comp_stats optional compression statistics (nullopt if none) * @param compression compression format * @param column_index_truncate_length maximum length of min or max values in column index, in bytes * @param stream CUDA stream used for device memory operations and kernel launches @@ -1268,6 +1269,7 @@ void encode_pages(hostdevice_2dvector& chunks, const statistics_chunk* page_stats, const statistics_chunk* chunk_stats, const statistics_chunk* column_stats, + std::optional& comp_stats, Compression compression, int32_t column_index_truncate_length, rmm::cuda_stream_view stream) @@ -1334,6 +1336,10 @@ void encode_pages(hostdevice_2dvector& chunks, d_chunks_in_batch.flat_view().size_bytes(), cudaMemcpyDefault, stream.value())); + + if (comp_stats.has_value()) { + comp_stats.value() += collect_compression_statistics(comp_in, comp_res, stream); + } stream.synchronize(); } @@ -1412,6 +1418,7 @@ void fill_table_meta(std::unique_ptr const& table_meta) * @param column_index_truncate_length maximum length of min or max values in column index, in bytes * @param stats_granularity Level of statistics requested in output file * @param compression Compression format + * @param collect_statistics Flag to indicate if statistics should be collected * @param dict_policy Policy for dictionary use * @param max_dictionary_size Maximum dictionary size, in bytes * @param single_write_mode Flag to indicate that we are guaranteeing a single table write @@ -1434,6 +1441,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, int32_t column_index_truncate_length, statistics_freq stats_granularity, Compression compression, + bool collect_compression_statistics, dictionary_policy dict_policy, size_t max_dictionary_size, single_write_mode write_mode, @@ -1871,6 +1879,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // Check device write support for all chunks and initialize bounce_buffer. bool all_device_write = true; uint32_t max_write_size = 0; + std::optional comp_stats; + if (collect_compression_statistics) { comp_stats = writer_compression_statistics{}; } // Encode row groups in batches for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { @@ -1892,6 +1902,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, (stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages : nullptr, (stats_granularity == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr, + comp_stats, compression, column_index_truncate_length, stream); @@ -1944,6 +1955,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::move(first_rg_in_part), std::move(batch_list), std::move(rg_to_part), + std::move(comp_stats), std::move(uncomp_bfr), std::move(comp_bfr), std::move(col_idx_bfr), @@ -1970,7 +1982,8 @@ writer::impl::impl(std::vector> sinks, _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), - _out_sink(std::move(sinks)) + _out_sink(std::move(sinks)), + _compression_statistics{options.get_compression_statistics()} { if (options.get_metadata()) { _table_meta = std::make_unique(*options.get_metadata()); @@ -1996,7 +2009,8 @@ writer::impl::impl(std::vector> sinks, _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), _single_write_mode(mode), - _out_sink(std::move(sinks)) + _out_sink(std::move(sinks)), + _compression_statistics{options.get_compression_statistics()} { if (options.get_metadata()) { _table_meta = std::make_unique(*options.get_metadata()); @@ -2018,6 +2032,14 @@ void writer::impl::init_state() std::fill_n(_current_chunk_offset.begin(), _current_chunk_offset.size(), sizeof(file_header_s)); } +void writer::impl::update_compression_statistics( + std::optional const& compression_stats) +{ + if (compression_stats.has_value() and _compression_statistics != nullptr) { + *_compression_statistics += compression_stats.value(); + } +} + void writer::impl::write(table_view const& input, std::vector const& partitions) { _last_write_successful = false; @@ -2036,6 +2058,7 @@ void writer::impl::write(table_view const& input, std::vector co first_rg_in_part, batch_list, rg_to_part, + comp_stats, uncomp_bfr, // unused, but contains data for later write to sink comp_bfr, // unused, but contains data for later write to sink col_idx_bfr, // unused, but contains data for later write to sink @@ -2054,6 +2077,7 @@ void writer::impl::write(table_view const& input, std::vector co _column_index_truncate_length, _stats_granularity, _compression, + _compression_statistics != nullptr, _dict_policy, _max_dictionary_size, _single_write_mode, @@ -2078,6 +2102,8 @@ void writer::impl::write(table_view const& input, std::vector co rg_to_part, bounce_buffer); + update_compression_statistics(comp_stats); + _last_write_successful = true; } diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index a9fe5612bfb..ee4d71f205a 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -93,6 +93,14 @@ class writer::impl { */ void init_state(); + /** + * @brief Updates writer-level statistics with data from the current table. + * + * @param compression_stats Optional compression statistics from the current table + */ + void update_compression_statistics( + std::optional const& compression_stats); + /** * @brief Writes a single subtable as part of a larger parquet file/table write, * normally used for chunked writing. @@ -167,6 +175,8 @@ class writer::impl { std::vector _current_chunk_offset; // To track if the last write(table) call // completed successfully current write // position for rowgroups/chunks. + std::shared_ptr + _compression_statistics; // Optional output compression statistics bool _last_write_successful = false; bool _closed = false; // To track if the output has been written to sink. }; From 6c62bb941dd981fcbacd5760f7831675fcd95120 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 4 May 2023 17:29:46 -0700 Subject: [PATCH 06/17] nan comment --- cpp/include/cudf/io/types.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index fab5346a9ea..da83a61b234 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -186,6 +186,8 @@ class writer_compression_statistics { /** * @brief Returns the compression ratio for the successfully compressed blocks. * + * Returns nan if there were no successfully compressed blocks. + * * @return double The ratio between the size of the compression inputs and the size of the * compressed output. */ From 3ca7b70668c8d301fcba52dd98678c69e4d3fe00 Mon Sep 17 00:00:00 2001 From: vuule Date: Thu, 4 May 2023 17:34:12 -0700 Subject: [PATCH 07/17] orc tests --- cpp/tests/io/orc_test.cpp | 83 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 088cff8d790..e1d44f89d7c 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1705,4 +1705,87 @@ TEST_F(OrcReaderTest, ZstdMaxCompressionRate) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } +TEST_F(OrcWriterTest, CompStats) +{ + auto table = create_random_fixed_table(1, 100000, true); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::orc_writer_options opts = + cudf::io::orc_writer_options::builder(cudf::io::sink_info{&unused_buffer}, table->view()) + .compression_statistics(stats); + cudf::io::write_orc(opts); + + EXPECT_NE(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_FALSE(std::isnan(stats->compression_ratio())); +} + +TEST_F(OrcChunkedWriterTest, CompStats) +{ + auto table = create_random_fixed_table(1, 100000, true); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::chunked_orc_writer_options opts = + cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&unused_buffer}) + .compression_statistics(stats); + cudf::io::orc_chunked_writer(opts).write(*table); + + EXPECT_NE(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_FALSE(std::isnan(stats->compression_ratio())); + + auto const single_table_comp_stats = *stats; + cudf::io::orc_chunked_writer(opts).write(*table); + + EXPECT_EQ(stats->compression_ratio(), single_table_comp_stats.compression_ratio()); + EXPECT_EQ(stats->num_compressed_bytes(), 2 * single_table_comp_stats.num_compressed_bytes()); + + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); +} + +void expect_compression_stats_empty(std::shared_ptr stats) +{ + EXPECT_EQ(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_TRUE(std::isnan(stats->compression_ratio())); +} + +TEST_F(OrcWriterTest, CompStatsEmptyTable) +{ + auto table_no_rows = create_random_fixed_table(20, 0, false); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::orc_writer_options opts = cudf::io::orc_writer_options::builder( + cudf::io::sink_info{&unused_buffer}, table_no_rows->view()) + .compression_statistics(stats); + cudf::io::write_orc(opts); + + expect_compression_stats_empty(stats); +} + +TEST_F(OrcChunkedWriterTest, CompStatsEmptyTable) +{ + auto table_no_rows = create_random_fixed_table(20, 0, false); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::chunked_orc_writer_options opts = + cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&unused_buffer}) + .compression_statistics(stats); + cudf::io::orc_chunked_writer(opts).write(*table_no_rows); + + expect_compression_stats_empty(stats); +} + CUDF_TEST_PROGRAM_MAIN() From f4b99d0facf626cf4cd3c5c25621863ec8a98952 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 9 May 2023 14:10:42 -0700 Subject: [PATCH 08/17] parquet tests --- cpp/tests/io/parquet_test.cpp | 84 +++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8662e1d5bd0..3af2347638a 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -5436,4 +5436,88 @@ TEST_F(ParquetReaderTest, ChunkedSingleLevelLists) EXPECT_TRUE(iterations < 10); } +TEST_F(ParquetWriterTest, CompStats) +{ + auto table = create_random_fixed_table(1, 100000, true); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&unused_buffer}, table->view()) + .compression_statistics(stats); + cudf::io::write_parquet(opts); + + EXPECT_NE(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_FALSE(std::isnan(stats->compression_ratio())); +} + +TEST_F(ParquetChunkedWriterTest, CompStats) +{ + auto table = create_random_fixed_table(1, 100000, true); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::chunked_parquet_writer_options opts = + cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{&unused_buffer}) + .compression_statistics(stats); + cudf::io::parquet_chunked_writer(opts).write(*table); + + EXPECT_NE(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_FALSE(std::isnan(stats->compression_ratio())); + + auto const single_table_comp_stats = *stats; + cudf::io::parquet_chunked_writer(opts).write(*table); + + EXPECT_EQ(stats->compression_ratio(), single_table_comp_stats.compression_ratio()); + EXPECT_EQ(stats->num_compressed_bytes(), 2 * single_table_comp_stats.num_compressed_bytes()); + + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); +} + +void expect_compression_stats_empty(std::shared_ptr stats) +{ + EXPECT_EQ(stats->num_compressed_bytes(), 0); + EXPECT_EQ(stats->num_failed_bytes(), 0); + EXPECT_EQ(stats->num_skipped_bytes(), 0); + EXPECT_TRUE(std::isnan(stats->compression_ratio())); +} + +TEST_F(ParquetWriterTest, CompStatsEmptyTable) +{ + auto table_no_rows = create_random_fixed_table(20, 0, false); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{&unused_buffer}, + table_no_rows->view()) + .compression_statistics(stats); + cudf::io::write_parquet(opts); + + expect_compression_stats_empty(stats); +} + +TEST_F(ParquetChunkedWriterTest, CompStatsEmptyTable) +{ + auto table_no_rows = create_random_fixed_table(20, 0, false); + + auto stats = std::make_shared(); + + std::vector unused_buffer; + cudf::io::chunked_parquet_writer_options opts = + cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{&unused_buffer}) + .compression_statistics(stats); + cudf::io::parquet_chunked_writer(opts).write(*table_no_rows); + + expect_compression_stats_empty(stats); +} + CUDF_TEST_PROGRAM_MAIN() From 4794064af877c472dfe5a227d9b4a16a71e85023 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 9 May 2023 16:36:20 -0700 Subject: [PATCH 09/17] docs --- cpp/include/cudf/io/orc.hpp | 4 ++-- cpp/include/cudf/io/parquet.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index e74beef526b..5af25a9910c 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -563,7 +563,7 @@ class orc_writer_options { } /** - * @brief Returns compression statistics. + * @brief Returns a pointer to the user-provided compression statistics. * * @return Compression statistics */ @@ -958,7 +958,7 @@ class chunked_orc_writer_options { } /** - * @brief Returns compression statistics. + * @brief Returns a pointer to the user-provided compression statistics. * * @return Compression statistics */ diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 796afe9d92d..3f095fa9419 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -673,7 +673,7 @@ class parquet_writer_options { [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } /** - * @brief Returns compression statistics. + * @brief Returns a pointer to the user-provided compression statistics. * * @return Compression statistics */ @@ -1243,7 +1243,7 @@ class chunked_parquet_writer_options { [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } /** - * @brief Returns compression statistics. + * @brief Returns a pointer to the user-provided compression statistics. * * @return Compression statistics */ From 2050818d3aa6ffcf9cf6745bb68a9c8acf6372c7 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 9 May 2023 16:53:28 -0700 Subject: [PATCH 10/17] member order; formatting --- cpp/include/cudf/io/types.hpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index da83a61b234..e8962544c87 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -104,11 +104,6 @@ enum statistics_freq { * @brief Statistics about compression performed by a writer. */ class writer_compression_statistics { - size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed - size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress - size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression - size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output - public: /** * @brief Default constructor @@ -195,6 +190,12 @@ class writer_compression_statistics { { return static_cast(num_compressed_bytes()) / _num_compressed_output_bytes; } + + private: + size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed + size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress + size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression + size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output }; /** @@ -266,8 +267,8 @@ struct host_buffer { }; /** - * @brief Returns `true` if the type is byte-like, meaning it is reasonable to pass as a pointer - * to bytes. + * @brief Returns `true` if the type is byte-like, meaning it is reasonable to pass as a pointer to + * bytes. * * @tparam T The representation type * @return `true` if the type is considered a byte-like type @@ -731,8 +732,7 @@ class column_in_metadata { [[nodiscard]] bool nullable() const { return _nullable.value(); } /** - * @brief If this is the metadata of a list column, returns whether it is to be encoded as a - * map. + * @brief If this is the metadata of a list column, returns whether it is to be encoded as a map. * * @return Boolean indicating whether this column is to be encoded as a map */ From f34cb9f4f0c24846d99595a095b03b1a53436fb4 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 9 May 2023 17:02:56 -0700 Subject: [PATCH 11/17] sink intermediate_stats --- cpp/src/io/orc/writer_impl.cu | 9 +++++---- cpp/src/io/orc/writer_impl.hpp | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 40154ac760a..61f002acb41 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -376,7 +376,7 @@ __global__ void copy_string_data(char* string_pool, void persisted_statistics::persist(int num_table_rows, single_write_mode write_mode, - intermediate_statistics& intermediate_stats, + intermediate_statistics&& intermediate_stats, rmm::cuda_stream_view stream) { if (write_mode == single_write_mode::NO) { @@ -2481,16 +2481,17 @@ void writer::impl::write(table_view const& input) add_table_to_footer_data(orc_table, stripes); // Update file-level and compression statistics - update_statistics(orc_table.num_rows(), intermediate_stats, compression_stats); + update_statistics(orc_table.num_rows(), std::move(intermediate_stats), compression_stats); } void writer::impl::update_statistics( size_type num_rows, - intermediate_statistics& intermediate_stats, // why not const? + intermediate_statistics&& intermediate_stats, std::optional const& compression_stats) { if (intermediate_stats.stripe_stat_chunks.size() > 0) { - _persisted_stripe_statistics.persist(num_rows, _single_write_mode, intermediate_stats, _stream); + _persisted_stripe_statistics.persist( + num_rows, _single_write_mode, std::move(intermediate_stats), _stream); } if (compression_stats.has_value() and _compression_statistics != nullptr) { diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 53e1d38264d..45a6fe981fe 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -210,7 +210,7 @@ struct persisted_statistics { void persist(int num_table_rows, single_write_mode write_mode, - intermediate_statistics& intermediate_stats, + intermediate_statistics&& intermediate_stats, rmm::cuda_stream_view stream); std::vector> stripe_stat_chunks; @@ -332,7 +332,7 @@ class writer::impl { * @param compression_stats Compression statistics from the current table */ void update_statistics(size_type num_rows, - intermediate_statistics& single_table_stats, + intermediate_statistics&& single_table_stats, std::optional const& compression_stats); private: From c90fc6814a31265ad88bb13cb0c12782c2aa122a Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 9 May 2023 17:03:07 -0700 Subject: [PATCH 12/17] comments --- cpp/src/io/orc/writer_impl.hpp | 2 +- cpp/src/io/parquet/writer_impl.hpp | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 45a6fe981fe..6c8f2483496 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -344,7 +344,7 @@ class writer::impl { size_type const _row_index_stride; CompressionKind const _compression_kind; size_t const _compression_blocksize; - std::shared_ptr _compression_statistics; + std::shared_ptr _compression_statistics; // Optional output statistics_freq const _stats_freq; single_write_mode const _single_write_mode; // Special parameter only used by `write()` to // indicate that we are guaranteeing a single table diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index ee4d71f205a..45a89eaba0b 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -175,8 +175,7 @@ class writer::impl { std::vector _current_chunk_offset; // To track if the last write(table) call // completed successfully current write // position for rowgroups/chunks. - std::shared_ptr - _compression_statistics; // Optional output compression statistics + std::shared_ptr _compression_statistics; // Optional output bool _last_write_successful = false; bool _closed = false; // To track if the output has been written to sink. }; From cb4bde24212fccb1971f0077a122537809381f8e Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 May 2023 15:28:24 -0700 Subject: [PATCH 13/17] getter type --- cpp/include/cudf/io/orc.hpp | 10 ++++------ cpp/include/cudf/io/parquet.hpp | 10 ++++------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 5af25a9910c..8fcae890899 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -563,12 +563,11 @@ class orc_writer_options { } /** - * @brief Returns a pointer to the user-provided compression statistics. + * @brief Returns a shared pointer to the user-provided compression statistics. * * @return Compression statistics */ - [[nodiscard]] std::shared_ptr const& get_compression_statistics() - const + [[nodiscard]] std::shared_ptr get_compression_statistics() const { return _compression_stats; } @@ -958,12 +957,11 @@ class chunked_orc_writer_options { } /** - * @brief Returns a pointer to the user-provided compression statistics. + * @brief Returns a shared pointer to the user-provided compression statistics. * * @return Compression statistics */ - [[nodiscard]] std::shared_ptr const& get_compression_statistics() - const + [[nodiscard]] std::shared_ptr get_compression_statistics() const { return _compression_stats; } diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 3f095fa9419..dc72bb99a3c 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -673,12 +673,11 @@ class parquet_writer_options { [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } /** - * @brief Returns a pointer to the user-provided compression statistics. + * @brief Returns a shared pointer to the user-provided compression statistics. * * @return Compression statistics */ - [[nodiscard]] std::shared_ptr const& get_compression_statistics() - const + [[nodiscard]] std::shared_ptr get_compression_statistics() const { return _compression_stats; } @@ -1243,12 +1242,11 @@ class chunked_parquet_writer_options { [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } /** - * @brief Returns a pointer to the user-provided compression statistics. + * @brief Returns a shared pointer to the user-provided compression statistics. * * @return Compression statistics */ - [[nodiscard]] std::shared_ptr const& get_compression_statistics() - const + [[nodiscard]] std::shared_ptr get_compression_statistics() const { return _compression_stats; } From e032a03396d84721dcec71bf9bdcd37ec45bd1ff Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 May 2023 15:28:58 -0700 Subject: [PATCH 14/17] const++ --- cpp/src/io/comp/statistics.cu | 4 ++-- cpp/tests/io/orc_test.cpp | 8 ++++---- cpp/tests/io/parquet_test.cpp | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu index 60685085c41..e0f7e1ec6dd 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/statistics.cu @@ -38,9 +38,9 @@ writer_compression_statistics collect_compression_statistics( thrust::plus()); auto input_size_with_status = [inputs, results, stream](compression_status status) { - auto zipped_begin = + auto const zipped_begin = thrust::make_zip_iterator(thrust::make_tuple(inputs.begin(), results.begin())); - auto zipped_end = thrust::make_zip_iterator(thrust::make_tuple(inputs.end(), results.end())); + auto const zipped_end = zipped_begin + inputs.size(); return thrust::transform_reduce( rmm::exec_policy(stream), diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index e1d44f89d7c..60b3dad5568 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1709,7 +1709,7 @@ TEST_F(OrcWriterTest, CompStats) { auto table = create_random_fixed_table(1, 100000, true); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::orc_writer_options opts = @@ -1727,7 +1727,7 @@ TEST_F(OrcChunkedWriterTest, CompStats) { auto table = create_random_fixed_table(1, 100000, true); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::chunked_orc_writer_options opts = @@ -1762,7 +1762,7 @@ TEST_F(OrcWriterTest, CompStatsEmptyTable) { auto table_no_rows = create_random_fixed_table(20, 0, false); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::orc_writer_options opts = cudf::io::orc_writer_options::builder( @@ -1777,7 +1777,7 @@ TEST_F(OrcChunkedWriterTest, CompStatsEmptyTable) { auto table_no_rows = create_random_fixed_table(20, 0, false); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::chunked_orc_writer_options opts = diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 3af2347638a..53a85df7053 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -5440,7 +5440,7 @@ TEST_F(ParquetWriterTest, CompStats) { auto table = create_random_fixed_table(1, 100000, true); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::parquet_writer_options opts = @@ -5458,7 +5458,7 @@ TEST_F(ParquetChunkedWriterTest, CompStats) { auto table = create_random_fixed_table(1, 100000, true); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::chunked_parquet_writer_options opts = @@ -5493,7 +5493,7 @@ TEST_F(ParquetWriterTest, CompStatsEmptyTable) { auto table_no_rows = create_random_fixed_table(20, 0, false); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::parquet_writer_options opts = @@ -5509,7 +5509,7 @@ TEST_F(ParquetChunkedWriterTest, CompStatsEmptyTable) { auto table_no_rows = create_random_fixed_table(20, 0, false); - auto stats = std::make_shared(); + auto const stats = std::make_shared(); std::vector unused_buffer; cudf::io::chunked_parquet_writer_options opts = From 6c97eb5ae08197cfb0737f32313193fca1ddb18f Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 May 2023 15:33:43 -0700 Subject: [PATCH 15/17] span --- cpp/src/io/orc/orc_gpu.hpp | 2 +- cpp/src/io/orc/stripe_enc.cu | 12 ++++++------ cpp/src/io/orc/writer_impl.cu | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 6059d6adf16..48917f2aff1 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -360,7 +360,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @return Compression statistics (if requested) */ std::optional CompressOrcDataStreams( - uint8_t* compressed_data, + device_span compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 7b818e66815..0e76dc13471 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1136,7 +1136,7 @@ __global__ void __launch_bounds__(256) device_span> inputs, device_span> outputs, device_span results, - uint8_t* compressed_bfr, + device_span compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size, uint32_t comp_block_align) @@ -1159,7 +1159,7 @@ __global__ void __launch_bounds__(256) } __syncthreads(); src = uncomp_base_g; - dst = compressed_bfr + ss.bfr_offset; + dst = compressed_bfr.data() + ss.bfr_offset; num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 1; for (uint32_t b = t; b < num_blocks; b += 256) { uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); @@ -1190,7 +1190,7 @@ __global__ void __launch_bounds__(1024) device_span const> inputs, device_span const> outputs, device_span results, - uint8_t* compressed_bfr, + device_span compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) { @@ -1209,7 +1209,7 @@ __global__ void __launch_bounds__(1024) __syncthreads(); num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 0; - dst = compressed_bfr + ss.bfr_offset; + dst = compressed_bfr.data() + ss.bfr_offset; b = 0; do { if (t == 0) { @@ -1255,7 +1255,7 @@ __global__ void __launch_bounds__(1024) // Update stripe stream with the compressed size if (t == 0) { strm_desc[stripe_id][stream_id].stream_size = - static_cast(dst - (compressed_bfr + ss.bfr_offset)); + static_cast(dst - (compressed_bfr.data() + ss.bfr_offset)); } } @@ -1292,7 +1292,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, } std::optional CompressOrcDataStreams( - uint8_t* compressed_data, + device_span compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 61f002acb41..a7861c60ba3 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -2334,7 +2334,7 @@ auto convert_table_to_orc_data(table_view const& input, compression_result{0, compression_status::FAILURE}); if (compression_kind != NONE) { strm_descs.host_to_device(stream); - compression_stats = gpu::CompressOrcDataStreams(compressed_data.data(), + compression_stats = gpu::CompressOrcDataStreams(compressed_data, num_compressed_blocks, compression_kind, compression_blocksize, From 805a68e0fe981c740c77e1b9c1eb5c18b331b19b Mon Sep 17 00:00:00 2001 From: vuule Date: Mon, 15 May 2023 15:38:53 -0700 Subject: [PATCH 16/17] setter --- cpp/include/cudf/io/orc.hpp | 12 ++++++------ cpp/include/cudf/io/parquet.hpp | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 8fcae890899..162c3afcce5 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -666,9 +666,9 @@ class orc_writer_options { * * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr const& comp_stats) + void set_compression_statistics(std::shared_ptr comp_stats) { - _compression_stats = comp_stats; + _compression_stats = std::move(comp_stats); } }; @@ -1049,13 +1049,13 @@ class chunked_orc_writer_options { } /** - * @brief Sets compression statistics. + * @brief Sets the pointer to the output compression statistics. * - * @param comp_stats Pointer to compression statistics to be updated after writing each table + * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr const& comp_stats) + void set_compression_statistics(std::shared_ptr comp_stats) { - _compression_stats = comp_stats; + _compression_stats = std::move(comp_stats); } }; diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index dc72bb99a3c..582dd1ec557 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -795,9 +795,9 @@ class parquet_writer_options { * * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr const& comp_stats) + void set_compression_statistics(std::shared_ptr comp_stats) { - _compression_stats = comp_stats; + _compression_stats = std::move(comp_stats); } }; @@ -1349,9 +1349,9 @@ class chunked_parquet_writer_options { * * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr const& comp_stats) + void set_compression_statistics(std::shared_ptr comp_stats) { - _compression_stats = comp_stats; + _compression_stats = std::move(comp_stats); } /** From 6d33d2cb5e8bbaa56540da7a9cc97ef95f9ac581 Mon Sep 17 00:00:00 2001 From: vuule Date: Tue, 16 May 2023 11:26:53 -0700 Subject: [PATCH 17/17] std for size_t --- cpp/include/cudf/io/types.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index e8962544c87..ffddf1b5a41 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -192,10 +192,10 @@ class writer_compression_statistics { } private: - size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed - size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress - size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression - size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output + std::size_t _num_compressed_bytes = 0; ///< The number of bytes that were successfully compressed + std::size_t _num_failed_bytes = 0; ///< The number of bytes that failed to compress + std::size_t _num_skipped_bytes = 0; ///< The number of bytes that were skipped during compression + std::size_t _num_compressed_output_bytes = 0; ///< The number of bytes in the compressed output }; /**