diff --git a/cpp/include/cudf/io/nvcomp_adapter.hpp b/cpp/include/cudf/io/nvcomp_adapter.hpp index 0d74a4158ad..4ad760d278f 100644 --- a/cpp/include/cudf/io/nvcomp_adapter.hpp +++ b/cpp/include/cudf/io/nvcomp_adapter.hpp @@ -22,7 +22,7 @@ #include namespace CUDF_EXPORT cudf { -namespace io::nvcomp { +namespace io::detail::nvcomp { enum class compression_type { SNAPPY, ZSTD, DEFLATE, LZ4, GZIP }; @@ -88,5 +88,5 @@ inline bool operator==(feature_status_parameters const& lhs, feature_status_para [[nodiscard]] std::optional is_decompression_disabled( compression_type compression, feature_status_parameters params = feature_status_parameters()); -} // namespace io::nvcomp +} // namespace io::detail::nvcomp } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp new file mode 100644 index 00000000000..a81ac60e03a --- /dev/null +++ b/cpp/src/io/comp/common.hpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cudf::io::detail { + +/** + * @brief The size used for padding a data buffer's size to a multiple of the padding. + * + * Padding is necessary for input/output buffers of several compression/decompression kernels + * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require + * padding to the buffers so that the pointers can shift along the address space to satisfy their + * alignment requirement. + * + * In the meantime, it is not entirely clear why such padding is needed. We need to further + * investigate and implement a better fix rather than just padding the buffer. + * See https://github.com/rapidsai/cudf/issues/13605. + */ +constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 2dda2287e09..26535bed43b 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -87,15 +87,14 @@ std::vector compress_snappy(host_span src, outputs[0] = d_dst; outputs.host_to_device_async(stream); - cudf::detail::hostdevice_vector hd_status(1, stream); + cudf::detail::hostdevice_vector hd_status(1, stream); hd_status[0] = {}; hd_status.host_to_device_async(stream); nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream); hd_status.device_to_host_sync(stream); - CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS, - "snappy compression failed"); + CUDF_EXPECTS(hd_status[0].status == compression_status::SUCCESS, "snappy compression failed"); return cudf::detail::make_std_vector_sync(d_dst, stream); } diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index 652abbbeda6..e16f26e1f06 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -16,16 +16,34 @@ #pragma once +#include "common.hpp" + #include #include -#include -#include #include namespace CUDF_EXPORT cudf { namespace io::detail { +/** + * @brief Status of a compression/decompression operation. + */ +enum class compression_status : uint8_t { + SUCCESS, ///< Successful, output is valid + FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) + SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) + OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output +}; + +/** + * @brief Descriptor of compression/decompression result. + */ +struct compression_result { + uint64_t bytes_written; + compression_status status; +}; + /** * @brief Compresses a system memory buffer. * diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 72649dbe427..151f72d262e 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -63,8 +63,8 @@ THE SOFTWARE. #include -namespace cudf { -namespace io { +namespace cudf::io::detail { + constexpr uint32_t huffman_lookup_table_width = 8; constexpr int8_t brotli_code_length_codes = 18; constexpr uint32_t brotli_num_distance_short_codes = 16; @@ -2020,7 +2020,6 @@ CUDF_KERNEL void __launch_bounds__(block_size, 2) results[block_id].status = (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; // Return ext heap used by last block (statistics) - results[block_id].reserved = s->fb_size; } } @@ -2115,5 +2114,4 @@ void gpu_debrotli(device_span const> inputs, #endif } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 090ea1430b5..6e5ce4ce6c3 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -49,8 +49,7 @@ Mark Adler madler@alumni.caltech.edu #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int max_bits = 15; // maximum bits in a code constexpr int max_l_codes = 286; // maximum number of literal/length codes @@ -1139,7 +1138,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) default: return compression_status::FAILURE; } }(); - results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes } } @@ -1224,5 +1222,4 @@ void gpu_copy_uncompressed_blocks(device_span const> } } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 8bfca2b30df..4b09bd5a84c 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -16,6 +16,8 @@ #pragma once +#include "io/comp/comp.hpp" + #include #include #include @@ -24,44 +26,10 @@ #include -namespace cudf { -namespace io { - -/** - * @brief Status of a compression/decompression operation. - */ -enum class compression_status : uint8_t { - SUCCESS, ///< Successful, output is valid - FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) - SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) - OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output -}; - -/** - * @brief Descriptor of compression/decompression result. - */ -struct compression_result { - uint64_t bytes_written; - compression_status status; - uint32_t reserved; -}; +namespace cudf::io::detail { enum class gzip_header_included { NO, YES }; -/** - * @brief The value used for padding a data buffer such that its size will be multiple of it. - * - * Padding is necessary for input/output buffers of several compression/decompression kernels - * (inflate_kernel and nvcomp snappy). Such kernels operate on aligned data pointers, which require - * padding to the buffers so that the pointers can shift along the address space to satisfy their - * alignment requirement. - * - * In the meantime, it is not entirely clear why such padding is needed. We need to further - * investigate and implement a better fix rather than just padding the buffer. - * See https://github.com/rapidsai/cudf/issues/13605. - */ -constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; - /** * @brief Interface for decompressing GZIP-compressed data * @@ -169,5 +137,4 @@ void gpu_snap(device_span const> inputs, device_span results, rmm::cuda_stream_view stream); -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index ca722a9b7ee..711a1c3274f 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -16,15 +16,13 @@ #pragma once +#include "common.hpp" + #include #include -#include -#include #include -using cudf::host_span; - namespace CUDF_EXPORT cudf { namespace io::detail { diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index d45c02f374f..3a4e315348c 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -30,7 +30,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { namespace { // Dispatcher for nvcompBatchedDecompressGetTempSizeEx @@ -478,4 +478,4 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi } } -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu index 794d452ebf2..cf5996dfd93 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cu +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ #include #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { batched_args create_batched_nvcomp_args(device_span const> inputs, device_span const> outputs, @@ -127,4 +127,4 @@ std::pair max_chunk_and_total_input_size(device_span @@ -27,7 +27,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { struct batched_args { rmm::device_uvector input_data_ptrs; @@ -76,4 +76,4 @@ void skip_unsupported_inputs(device_span input_sizes, std::pair max_chunk_and_total_input_size(device_span input_sizes, rmm::cuda_stream_view stream); -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index 2e1cda2d6b7..5c402523168 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -16,7 +16,7 @@ #pragma once -#include "gpuinflate.hpp" +#include "io/comp/comp.hpp" #include #include @@ -25,7 +25,7 @@ #include -namespace cudf::io::nvcomp { +namespace cudf::io::detail::nvcomp { /** * @brief Device batch decompression of given type. * @@ -103,4 +103,4 @@ void batched_compress(compression_type compression, device_span results, rmm::cuda_stream_view stream); -} // namespace cudf::io::nvcomp +} // namespace cudf::io::detail::nvcomp diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 7d4dcffa713..1443bfd38a2 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -19,8 +19,7 @@ #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int hash_bits = 12; // TBD: Tentatively limits to 2-byte codes to prevent long copy search followed by long literal @@ -329,7 +328,6 @@ CUDF_KERNEL void __launch_bounds__(128) results[blockIdx.x].bytes_written = s->dst - s->dst_base; results[blockIdx.x].status = (s->dst > s->end) ? compression_status::FAILURE : compression_status::SUCCESS; - results[blockIdx.x].reserved = 0; } } @@ -345,5 +343,4 @@ void gpu_snap(device_span const> inputs, } } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/statistics.cu b/cpp/src/io/comp/statistics.cu index faf967041bc..caee9145d2c 100644 --- a/cpp/src/io/comp/statistics.cu +++ b/cpp/src/io/comp/statistics.cu @@ -21,7 +21,7 @@ #include #include -namespace cudf::io { +namespace cudf::io::detail { writer_compression_statistics collect_compression_statistics( device_span const> inputs, @@ -61,4 +61,4 @@ writer_compression_statistics collect_compression_statistics( output_size_successful}; } -} // namespace cudf::io +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 9b01272ac70..cf841c435a3 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -21,12 +21,10 @@ #include -namespace cudf { -namespace io { +namespace cudf::io::detail { constexpr int32_t batch_size = (1 << 5); constexpr int32_t batch_count = (1 << 2); constexpr int32_t prefetch_size = (1 << 9); // 512B, in 32B chunks -constexpr bool log_cyclecount = false; void __device__ busy_wait(size_t cycles) { @@ -647,7 +645,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) auto cur = s->src.begin(); auto const end = s->src.end(); s->error = 0; - if (log_cyclecount) { s->tstart = clock(); } if (cur < end) { // Read uncompressed size (varint), limited to 32-bit uint32_t uncompressed_size = *cur++; @@ -705,11 +702,6 @@ CUDF_KERNEL void __launch_bounds__(block_size) results[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; results[strm_id].status = (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; - if (log_cyclecount) { - results[strm_id].reserved = clock() - s->tstart; - } else { - results[strm_id].reserved = 0; - } } } @@ -724,5 +716,4 @@ void gpu_unsnap(device_span const> inputs, unsnap_kernel<128><<>>(inputs, outputs, results); } -} // namespace io -} // namespace cudf +} // namespace cudf::io::detail diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 654ee1e012c..f4e75f78dec 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -16,7 +16,7 @@ #pragma once -#include "io/comp/gpuinflate.hpp" +#include "io/comp/comp.hpp" #include "io/statistics/statistics.cuh" #include "io/utilities/column_buffer.hpp" #include "orc.hpp" @@ -73,14 +73,14 @@ struct CompressedStreamInfo { uint8_t const* compressed_data{}; // [in] base ptr to compressed stream data uint8_t* uncompressed_data{}; // [in] base ptr to uncompressed stream data or NULL if not known yet - size_t compressed_data_size{}; // [in] compressed data size for this stream - device_span* dec_in_ctl{}; // [in] input buffer to decompress - device_span* dec_out_ctl{}; // [in] output buffer to decompress into - device_span dec_res{}; // [in] results of decompression - device_span* copy_in_ctl{}; // [out] input buffer to copy - device_span* copy_out_ctl{}; // [out] output buffer to copy to - uint32_t num_compressed_blocks{}; // [in,out] number of entries in decctl(in), number of - // compressed blocks(out) + size_t compressed_data_size{}; // [in] compressed data size for this stream + device_span* dec_in_ctl{}; // [in] input buffer to decompress + device_span* dec_out_ctl{}; // [in] output buffer to decompress into + device_span dec_res{}; // [in] results of decompression + device_span* copy_in_ctl{}; // [out] input buffer to copy + device_span* copy_out_ctl{}; // [out] output buffer to copy to + uint32_t num_compressed_blocks{}; // [in,out] number of entries in decctl(in), number of + // compressed blocks(out) uint32_t num_uncompressed_blocks{}; // [in,out] number of entries in dec_in_ctl(in), number of // uncompressed blocks(out) uint64_t max_uncompressed_size{}; // [out] maximum uncompressed data size of stream @@ -414,7 +414,7 @@ std::optional CompressOrcDataStreams( bool collect_statistics, device_2dspan strm_desc, device_2dspan enc_streams, - device_span comp_res, + device_span comp_res, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index 0081ed30d17..b661bb4ff90 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -269,7 +269,7 @@ rmm::device_buffer decompress_stripe_data( num_uncompressed_blocks}; device_span> copy_out_view{inflate_out.data() + num_compressed_blocks, num_uncompressed_blocks}; - gpu_copy_uncompressed_blocks(copy_in_view, copy_out_view, stream); + cudf::io::detail::gpu_copy_uncompressed_blocks(copy_in_view, copy_out_view, stream); } // Copy without stream sync, thus need to wait for stream sync below to access. diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 07172b6b7f7..79ecca0ca99 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/block_utils.cuh" #include "io/utilities/time_utils.cuh" @@ -44,7 +45,11 @@ namespace io { namespace orc { namespace gpu { +namespace nvcomp = cudf::io::detail::nvcomp; + using cudf::detail::device_2dspan; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; constexpr int scratch_buffer_size = 512 * 4; constexpr int compact_streams_block_size = 1024; @@ -1385,7 +1390,7 @@ std::optional CompressOrcDataStreams( if (compression == SNAPPY) { try { if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_snap(comp_in, comp_out, comp_res, stream); + cudf::io::detail::gpu_snap(comp_in, comp_out, comp_res, stream); } else { nvcomp::batched_compress( nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); @@ -1429,7 +1434,7 @@ std::optional CompressOrcDataStreams( strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); if (collect_statistics) { - return cudf::io::collect_compression_statistics(comp_in, comp_res, stream); + return cudf::io::detail::collect_compression_statistics(comp_in, comp_res, stream); } else { return std::nullopt; } diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 6b9c19368dc..ce868b83c04 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -71,6 +71,8 @@ namespace cudf::io::orc::detail { +namespace nvcomp = cudf::io::detail::nvcomp; + template [[nodiscard]] constexpr int varint_size(T val) { @@ -2023,8 +2025,8 @@ size_t max_compression_output_size(CompressionKind compression_kind, uint32_t co { if (compression_kind == NONE) return 0; - return compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); + return nvcomp::compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), + compression_blocksize); } std::unique_ptr make_table_meta(table_view const& input) diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index e9558735929..a1edd21f8a2 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -51,6 +51,9 @@ namespace { using ::cudf::detail::device_2dspan; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; + constexpr int encode_block_size = 128; constexpr int rle_buffer_size = 2 * encode_block_size; constexpr int num_encode_warps = encode_block_size / cudf::detail::warp_size; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index ce9d48693ec..b2563ab5065 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -17,7 +17,7 @@ #pragma once #include "error.hpp" -#include "io/comp/gpuinflate.hpp" +#include "io/comp/comp.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_common.hpp" #include "io/statistics/statistics.cuh" @@ -599,12 +599,12 @@ struct EncColumnChunk { */ struct EncPage { // all pointers at the top to keep things properly aligned - uint8_t* page_data; //!< Ptr to uncompressed page - uint8_t* compressed_data; //!< Ptr to compressed page - EncColumnChunk* chunk; //!< Chunk that this page belongs to - compression_result* comp_res; //!< Ptr to compression result - uint32_t* def_histogram; //!< Histogram of counts for each definition level - uint32_t* rep_histogram; //!< Histogram of counts for each repetition level + uint8_t* page_data; //!< Ptr to uncompressed page + uint8_t* compressed_data; //!< Ptr to compressed page + EncColumnChunk* chunk; //!< Chunk that this page belongs to + cudf::io::detail::compression_result* comp_res; //!< Ptr to compression result + uint32_t* def_histogram; //!< Histogram of counts for each definition level + uint32_t* rep_histogram; //!< Histogram of counts for each repetition level // put this here in case it's ever made 64-bit encode_kernel_mask kernel_mask; //!< Mask used to control which encoding kernels to run // the rest can be 4 byte aligned @@ -1023,7 +1023,7 @@ void EncodePages(device_span pages, bool write_v2_headers, device_span> comp_in, device_span> comp_out, - device_span comp_res, + device_span comp_res, rmm::cuda_stream_view stream); /** @@ -1046,7 +1046,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @param[in] stream CUDA stream to use */ void EncodePageHeaders(device_span pages, - device_span comp_res, + device_span comp_res, device_span page_stats, statistics_chunk const* chunk_stats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 27312a4da89..933be889b1a 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -15,6 +15,8 @@ */ #include "compact_protocol_reader.hpp" +#include "io/comp/comp.hpp" +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/utilities/time_utils.cuh" #include "reader_impl.hpp" @@ -44,6 +46,10 @@ namespace cudf::io::parquet::detail { namespace { +namespace nvcomp = cudf::io::detail::nvcomp; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; + struct split_info { row_range rows; int64_t split_pos; @@ -795,14 +801,16 @@ std::vector compute_page_splits_by_row(device_span 0) { - debrotli_scratch.resize(get_gpu_debrotli_scratch_size(codec.num_pages), stream); + debrotli_scratch.resize(cudf::io::detail::get_gpu_debrotli_scratch_size(codec.num_pages), + stream); } } // Dispatch batches of pages to decompress for each codec. // Buffer needs to be padded, required by `gpuDecodePageData`. rmm::device_buffer decomp_pages( - cudf::util::round_up_safe(total_decomp_size, BUFFER_PADDING_MULTIPLE), stream); + cudf::util::round_up_safe(total_decomp_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); auto comp_in = cudf::detail::make_empty_host_vector>(num_comp_pages, stream); @@ -874,8 +882,11 @@ std::vector compute_page_splits_by_row(device_span compute_page_splits_by_row(device_span @@ -251,8 +252,8 @@ void generate_depth_remappings( if (source->is_device_read_preferred(io_size)) { // Buffer needs to be padded. // Required by `gpuDecodePageData`. - page_data[chunk] = - rmm::device_buffer(cudf::util::round_up_safe(io_size, BUFFER_PADDING_MULTIPLE), stream); + page_data[chunk] = rmm::device_buffer( + cudf::util::round_up_safe(io_size, cudf::io::detail::BUFFER_PADDING_MULTIPLE), stream); auto fut_read_size = source->device_read_async( io_offset, io_size, static_cast(page_data[chunk].data()), stream); read_tasks.emplace_back(std::move(fut_read_size)); @@ -261,7 +262,8 @@ void generate_depth_remappings( // Buffer needs to be padded. // Required by `gpuDecodePageData`. page_data[chunk] = rmm::device_buffer( - cudf::util::round_up_safe(read_buffer->size(), BUFFER_PADDING_MULTIPLE), stream); + cudf::util::round_up_safe(read_buffer->size(), cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); CUDF_CUDA_TRY(cudaMemcpyAsync(page_data[chunk].data(), read_buffer->data(), read_buffer->size(), diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 6db92462498..6b1a20701f9 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -23,6 +23,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" #include "interop/decimal_conversion_utilities.cuh" +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/parquet/parquet.hpp" #include "io/parquet/parquet_gpu.hpp" diff --git a/cpp/src/io/parquet/writer_impl_helpers.cpp b/cpp/src/io/parquet/writer_impl_helpers.cpp index 396d44c0763..f15ea1f3c37 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.cpp +++ b/cpp/src/io/parquet/writer_impl_helpers.cpp @@ -21,6 +21,8 @@ #include "writer_impl_helpers.hpp" +#include "io/comp/nvcomp_adapter.hpp" + #include #include #include diff --git a/cpp/src/io/parquet/writer_impl_helpers.hpp b/cpp/src/io/parquet/writer_impl_helpers.hpp index a85411594e9..14a9a0ed5b7 100644 --- a/cpp/src/io/parquet/writer_impl_helpers.hpp +++ b/cpp/src/io/parquet/writer_impl_helpers.hpp @@ -20,11 +20,11 @@ */ #pragma once -#include "io/comp/nvcomp_adapter.hpp" #include "parquet_common.hpp" #include #include +#include namespace cudf::io::parquet::detail { @@ -42,7 +42,7 @@ Compression to_parquet_compression(compression_type compression); * @param codec Compression codec * @return Translated nvcomp compression type */ -nvcomp::compression_type to_nvcomp_compression_type(Compression codec); +cudf::io::detail::nvcomp::compression_type to_nvcomp_compression_type(Compression codec); /** * @brief Function that computes input alignment requirements for the given compression type. diff --git a/cpp/src/io/text/bgzip_data_chunk_source.cu b/cpp/src/io/text/bgzip_data_chunk_source.cu index 06069630685..162da62ef03 100644 --- a/cpp/src/io/text/bgzip_data_chunk_source.cu +++ b/cpp/src/io/text/bgzip_data_chunk_source.cu @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "io/comp/gpuinflate.hpp" #include "io/comp/nvcomp_adapter.hpp" #include "io/text/device_data_chunks.hpp" @@ -41,6 +42,8 @@ namespace cudf::io::text { namespace { +namespace nvcomp = cudf::io::detail::nvcomp; + /** * @brief Transforms offset tuples of the form [compressed_begin, compressed_end, * decompressed_begin, decompressed_end] into span tuples of the form [compressed_device_span, @@ -73,7 +76,8 @@ class bgzip_data_chunk_reader : public data_chunk_reader { { // Buffer needs to be padded. // Required by `inflate_kernel`. - device.resize(cudf::util::round_up_safe(host.size(), BUFFER_PADDING_MULTIPLE), stream); + device.resize(cudf::util::round_up_safe(host.size(), cudf::io::detail::BUFFER_PADDING_MULTIPLE), + stream); cudf::detail::cuda_memcpy_async( device_span{device}.subspan(0, host.size()), host, stream); } @@ -94,7 +98,7 @@ class bgzip_data_chunk_reader : public data_chunk_reader { rmm::device_uvector d_decompressed_offsets; rmm::device_uvector> d_compressed_spans; rmm::device_uvector> d_decompressed_spans; - rmm::device_uvector d_decompression_results; + rmm::device_uvector d_decompression_results; std::size_t compressed_size_with_headers{}; std::size_t max_decompressed_size{}; // this is usually equal to decompressed_size() @@ -152,16 +156,16 @@ class bgzip_data_chunk_reader : public data_chunk_reader { gpuinflate(d_compressed_spans, d_decompressed_spans, d_decompression_results, - gzip_header_included::NO, + cudf::io::detail::gzip_header_included::NO, stream); } else { - cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type::DEFLATE, - d_compressed_spans, - d_decompressed_spans, - d_decompression_results, - max_decompressed_size, - decompressed_size(), - stream); + nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, + d_compressed_spans, + d_decompressed_spans, + d_decompression_results, + max_decompressed_size, + decompressed_size(), + stream); } } is_decompressed = true; diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index 54262dc3b44..5bbe8b63c47 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -30,6 +30,9 @@ #include using cudf::device_span; +using cudf::io::detail::compression_result; +using cudf::io::detail::compression_status; +namespace nvcomp = cudf::io::detail::nvcomp; /** * @brief Base test fixture for decompression @@ -61,7 +64,7 @@ struct DecompressTest : public cudf::test::BaseFixture { inf_out[0] = dst; inf_out.host_to_device_async(stream); - cudf::detail::hostdevice_vector inf_stat(1, stream); + cudf::detail::hostdevice_vector inf_stat(1, stream); inf_stat[0] = {}; inf_stat.host_to_device_async(stream); @@ -69,7 +72,7 @@ struct DecompressTest : public cudf::test::BaseFixture { CUDF_CUDA_TRY(cudaMemcpyAsync( decompressed.data(), dst.data(), dst.size(), cudaMemcpyDefault, stream.value())); inf_stat.device_to_host_sync(stream); - ASSERT_EQ(inf_stat[0].status, cudf::io::compression_status::SUCCESS); + ASSERT_EQ(inf_stat[0].status, compression_status::SUCCESS); } }; @@ -79,13 +82,13 @@ struct DecompressTest : public cudf::test::BaseFixture { struct GzipDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - cudf::io::gpuinflate(d_inf_in, - d_inf_out, - d_inf_stat, - cudf::io::gzip_header_included::YES, - cudf::get_default_stream()); + cudf::io::detail::gpuinflate(d_inf_in, + d_inf_out, + d_inf_stat, + cudf::io::detail::gzip_header_included::YES, + cudf::get_default_stream()); } }; @@ -95,9 +98,9 @@ struct GzipDecompressTest : public DecompressTest { struct SnappyDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - cudf::io::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::get_default_stream()); + cudf::io::detail::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::get_default_stream()); } }; @@ -107,17 +110,17 @@ struct SnappyDecompressTest : public DecompressTest { struct BrotliDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { - rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), + rmm::device_buffer d_scratch{cudf::io::detail::get_gpu_debrotli_scratch_size(1), cudf::get_default_stream()}; - cudf::io::gpu_debrotli(d_inf_in, - d_inf_out, - d_inf_stat, - d_scratch.data(), - d_scratch.size(), - cudf::get_default_stream()); + cudf::io::detail::gpu_debrotli(d_inf_in, + d_inf_out, + d_inf_stat, + d_scratch.data(), + d_scratch.size(), + cudf::get_default_stream()); } }; @@ -181,8 +184,8 @@ TEST_F(BrotliDecompressTest, HelloWorld) TEST_F(NvcompConfigTest, Compression) { - using cudf::io::nvcomp::compression_type; - auto const& comp_disabled = cudf::io::nvcomp::is_compression_disabled; + using nvcomp::compression_type; + auto const& comp_disabled = nvcomp::is_compression_disabled; EXPECT_FALSE(comp_disabled(compression_type::DEFLATE, {true, true})); // all integrations enabled required @@ -201,8 +204,8 @@ TEST_F(NvcompConfigTest, Compression) TEST_F(NvcompConfigTest, Decompression) { - using cudf::io::nvcomp::compression_type; - auto const& decomp_disabled = cudf::io::nvcomp::is_decompression_disabled; + using nvcomp::compression_type; + auto const& decomp_disabled = nvcomp::is_decompression_disabled; EXPECT_FALSE(decomp_disabled(compression_type::DEFLATE, {true, true})); // all integrations enabled required diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index fce99187516..2209a30149d 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -40,6 +40,8 @@ #include #include +namespace nvcomp = cudf::io::detail::nvcomp; + template using column_wrapper = std::conditional_t, @@ -1135,7 +1137,7 @@ TEST_F(OrcReaderTest, SingleInputs) TEST_F(OrcReaderTest, zstdCompressionRegression) { - if (cudf::io::nvcomp::is_decompression_disabled(cudf::io::nvcomp::compression_type::ZSTD)) { + if (nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD)) { GTEST_SKIP() << "Newer nvCOMP version is required"; } @@ -1700,8 +1702,8 @@ TEST_F(OrcMetadataReaderTest, TestNested) TEST_F(OrcReaderTest, ZstdMaxCompressionRate) { - if (cudf::io::nvcomp::is_decompression_disabled(cudf::io::nvcomp::compression_type::ZSTD) or - cudf::io::nvcomp::is_compression_disabled(cudf::io::nvcomp::compression_type::ZSTD)) { + if (nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD) or + nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD)) { GTEST_SKIP() << "Newer nvCOMP version is required"; }