From 578e65f09c1eb7c3fe1c600590b26acccca59a5d Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Mon, 12 Sep 2022 14:18:38 -0700 Subject: [PATCH] Enable ZSTD compression in ORC and Parquet writers (#11551) Closes https://github.com/rapidsai/cudf/issues/9058, https://github.com/rapidsai/cudf/issues/9056 Expands nvCOMP adapter to include ZSTD compression. Adds centralized nvCOMP policy. `is_compression_enabled`. Adds centralized nvCOMP alignment utility, `compress_input_alignment_bits`. Adds centralized nvCOMP utility to get the maximum supported compression chunk size - `batched_compress_max_allowed_chunk_size`. Encoded ORC row groups are aligned based on compression requirements. Encoded Parquet pages are aligned based on compression requirements. Parquet fragment size now scales with the page size to better fit the default page size with ZSTD compression. Small refactoring around `decompress_status` for improved type safety and hopefully naming. Replaced `snappy_compress` from the Parquet writer with the nvCOMP adapter call. Vectors of `compression_result`s are initialized before compression to avoid issues with random chunk skipping due to uninitialized memory. Authors: - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Jason Lowe (https://github.com/jlowe) - Jim Brennan (https://github.com/jbrennan333) - Mike Wilson (https://github.com/hyperbolic2346) - Tobias Ribizel (https://github.com/upsj) - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/11551 --- cpp/src/io/avro/reader_impl.cu | 12 +- cpp/src/io/comp/debrotli.cu | 15 +- cpp/src/io/comp/gpuinflate.cu | 20 +- cpp/src/io/comp/gpuinflate.hpp | 32 ++-- cpp/src/io/comp/nvcomp_adapter.cpp | 147 +++++++++++--- cpp/src/io/comp/nvcomp_adapter.cu | 82 +++++--- cpp/src/io/comp/nvcomp_adapter.cuh | 28 ++- cpp/src/io/comp/nvcomp_adapter.hpp | 49 ++++- cpp/src/io/comp/snap.cu | 13 +- cpp/src/io/comp/uncomp.cpp | 6 +- cpp/src/io/comp/unsnap.cu | 15 +- cpp/src/io/orc/orc.hpp | 12 +- cpp/src/io/orc/orc_common.hpp | 7 - cpp/src/io/orc/orc_gpu.hpp | 22 +-- cpp/src/io/orc/reader_impl.cu | 36 ++-- cpp/src/io/orc/stripe_enc.cu | 85 +++++---- cpp/src/io/orc/stripe_init.cu | 14 +- cpp/src/io/orc/writer_impl.cu | 130 +++++++++---- cpp/src/io/orc/writer_impl.hpp | 2 +- cpp/src/io/parquet/page_enc.cu | 54 +++--- cpp/src/io/parquet/parquet_gpu.hpp | 19 +- cpp/src/io/parquet/reader_impl.cu | 33 ++-- cpp/src/io/parquet/writer_impl.cu | 179 ++++++++---------- cpp/src/io/parquet/writer_impl.hpp | 2 +- cpp/tests/io/comp/decomp_test.cpp | 10 +- .../java/ai/rapids/cudf/CompressionType.java | 20 +- python/cudf/cudf/_lib/orc.pyx | 2 + python/cudf/cudf/_lib/parquet.pyx | 2 + python/cudf/cudf/tests/test_orc.py | 19 +- python/cudf/cudf/tests/test_parquet.py | 20 ++ python/cudf/cudf/utils/ioutils.py | 4 +- 31 files changed, 686 insertions(+), 405 deletions(-) diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index e5b73dc9360..7fcdf1bf29a 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -164,7 +164,11 @@ rmm::device_buffer decompress_data(datasource& source, if (meta.codec == "deflate") { auto inflate_in = hostdevice_vector>(meta.block_list.size(), stream); auto inflate_out = hostdevice_vector>(meta.block_list.size(), stream); - auto inflate_stats = hostdevice_vector(meta.block_list.size(), stream); + auto inflate_stats = hostdevice_vector(meta.block_list.size(), stream); + thrust::fill(rmm::exec_policy(stream), + inflate_stats.d_begin(), + inflate_stats.d_end(), + compression_result{0, compression_status::FAILURE}); // Guess an initial maximum uncompressed block size. We estimate the compression factor is two // and round up to the next multiple of 4096 bytes. @@ -190,8 +194,6 @@ rmm::device_buffer decompress_data(datasource& source, for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { inflate_out.host_to_device(stream); - CUDF_CUDA_TRY(cudaMemsetAsync( - inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value())); gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream); inflate_stats.device_to_host(stream, true); @@ -204,9 +206,9 @@ rmm::device_buffer decompress_data(datasource& source, inflate_stats.begin(), std::back_inserter(actual_uncomp_sizes), [](auto const& inf_out, auto const& inf_stats) { - // If error status is 1 (buffer too small), the `bytes_written` field + // If error status is OUTPUT_OVERFLOW, the `bytes_written` field // actually contains the uncompressed data size - return inf_stats.status == 1 + return inf_stats.status == compression_status::OUTPUT_OVERFLOW ? std::max(inf_out.size(), inf_stats.bytes_written) : inf_out.size(); }); diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 07dc2cc9870..b6f2d2db811 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -1906,7 +1906,7 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction * * @param[in] inputs Source buffer per block * @param[out] outputs Destination buffer per block - * @param[out] statuses Decompressor status per block + * @param[out] results Decompressor status per block * @param scratch Intermediate device memory heap space (will be dynamically shared between blocks) * @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between * blocks) @@ -1914,7 +1914,7 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction __global__ void __launch_bounds__(block_size, 2) gpu_debrotli_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, uint8_t* scratch, uint32_t scratch_size) { @@ -2016,10 +2016,11 @@ __global__ void __launch_bounds__(block_size, 2) __syncthreads(); // Output decompression status if (!t) { - statuses[block_id].bytes_written = s->out - s->outbase; - statuses[block_id].status = s->error; + results[block_id].bytes_written = s->out - s->outbase; + results[block_id].status = + (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; // Return ext heap used by last block (statistics) - statuses[block_id].reserved = s->fb_size; + results[block_id].reserved = s->fb_size; } } @@ -2079,7 +2080,7 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) void gpu_debrotli(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, void* scratch, size_t scratch_size, rmm::cuda_stream_view stream) @@ -2104,7 +2105,7 @@ void gpu_debrotli(device_span const> inputs, cudaMemcpyHostToDevice, stream.value())); gpu_debrotli_kernel<<>>( - inputs, outputs, statuses, scratch_u8, fb_heap_size); + inputs, outputs, results, scratch_u8, fb_heap_size); #if DUMP_FB_HEAP uint32_t dump[2]; uint32_t cur = 0; diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 16f4ea84f7f..dacc5a00d16 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -1020,14 +1020,14 @@ __device__ int parse_gzip_header(const uint8_t* src, size_t src_size) * @tparam block_size Thread block dimension for this call * @param inputs Source and destination buffer information per block * @param outputs Destination buffer information per block - * @param statuses Decompression status buffer per block + * @param results Decompression status buffer per block * @param parse_hdr If nonzero, indicates that the compressed bitstream includes a GZIP header */ template __global__ void __launch_bounds__(block_size) inflate_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr) { __shared__ __align__(16) inflate_state_s state_g; @@ -1133,9 +1133,15 @@ __global__ void __launch_bounds__(block_size) // Output buffer too small state->err = 1; } - statuses[z].bytes_written = state->out - state->outbase; - statuses[z].status = state->err; - statuses[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes + results[z].bytes_written = state->out - state->outbase; + results[z].status = [&]() { + switch (state->err) { + case 0: return compression_status::SUCCESS; + case 1: return compression_status::OUTPUT_OVERFLOW; + default: return compression_status::FAILURE; + } + }(); + results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes } } @@ -1200,14 +1206,14 @@ __global__ void __launch_bounds__(1024) void gpuinflate(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr, rmm::cuda_stream_view stream) { constexpr int block_size = 128; // Threads per block if (inputs.size() > 0) { inflate_kernel - <<>>(inputs, outputs, statuses, parse_hdr); + <<>>(inputs, outputs, results, parse_hdr); } } diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 3870b2ac3b3..1b45a31b13b 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -26,11 +26,21 @@ namespace cudf { namespace io { /** - * @brief Output parameters for the decompression interface + * @brief Status of a compression/decompression operation. */ -struct decompress_status { +enum class compression_status : uint8_t { + SUCCESS, ///< Successful, output is valid + FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) + SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) + OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output +}; + +/** + * @brief Descriptor of compression/decompression result. + */ +struct compression_result { uint64_t bytes_written; - uint32_t status; + compression_status status; uint32_t reserved; }; @@ -44,13 +54,13 @@ enum class gzip_header_included { NO, YES }; * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] parse_hdr Whether or not to parse GZIP header * @param[in] stream CUDA stream to use */ void gpuinflate(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr, rmm::cuda_stream_view stream); @@ -73,12 +83,12 @@ void gpu_copy_uncompressed_blocks(device_span const> * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void gpu_unsnap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream); /** @@ -98,14 +108,14 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0); * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] scratch Temporary memory for intermediate work * @param[in] scratch_size Size in bytes of the temporary memory * @param[in] stream CUDA stream to use */ void gpu_debrotli(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, void* scratch, size_t scratch_size, rmm::cuda_stream_view stream); @@ -118,12 +128,12 @@ void gpu_debrotli(device_span const> inputs, * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void gpu_snap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream); } // namespace io diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index 91deda50cf2..31f7b9b472e 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -21,17 +21,29 @@ #include +#define NVCOMP_DEFLATE_HEADER +#if __has_include(NVCOMP_DEFLATE_HEADER) +#include NVCOMP_DEFLATE_HEADER +#endif + #define NVCOMP_ZSTD_HEADER #if __has_include(NVCOMP_ZSTD_HEADER) #include NVCOMP_ZSTD_HEADER -#define NVCOMP_HAS_ZSTD 1 +#endif + +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 3) +#define NVCOMP_HAS_ZSTD_DECOMP 1 #else -#define NVCOMP_HAS_ZSTD 0 +#define NVCOMP_HAS_ZSTD_DECOMP 0 #endif -#define NVCOMP_DEFLATE_HEADER -#if __has_include(NVCOMP_DEFLATE_HEADER) -#include NVCOMP_DEFLATE_HEADER +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 4) +#define NVCOMP_HAS_ZSTD_COMP 1 +#else +#define NVCOMP_HAS_ZSTD_COMP 0 +#endif + +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 3) #define NVCOMP_HAS_DEFLATE 1 #else #define NVCOMP_HAS_DEFLATE 0 @@ -63,7 +75,7 @@ nvcompStatus_t batched_decompress_get_temp_size_ex(compression_type compression, case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSizeEx(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressGetTempSizeEx(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -83,7 +95,7 @@ auto batched_decompress_get_temp_size(compression_type compression, Args&&... ar case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressGetTempSize(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -106,7 +118,7 @@ auto batched_decompress_async(compression_type compression, Args&&... args) case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressAsync(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressAsync(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -146,14 +158,14 @@ size_t batched_decompress_temp_size(compression_type compression, void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, size_t max_uncomp_chunk_size, size_t max_total_uncomp_size, rmm::cuda_stream_view stream) { // TODO Consolidate config use to a common location if (compression == compression_type::ZSTD) { -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP #if NVCOMP_ZSTD_IS_EXPERIMENTAL CUDF_EXPECTS(cudf::io::detail::nvcomp_integration::is_all_enabled(), "Zstandard compression is experimental, you can enable it through " @@ -187,7 +199,7 @@ void batched_decompress(compression_type compression, stream.value()); CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "unable to perform decompression"); - convert_status(nvcomp_statuses, actual_uncompressed_data_sizes, statuses, stream); + update_compression_results(nvcomp_statuses, actual_uncompressed_data_sizes, results, stream); } // Dispatcher for nvcompBatchedCompressGetTempSize @@ -210,7 +222,14 @@ auto batched_compress_temp_size(compression_type compression, #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + nvcomp_status = nvcompBatchedZstdCompressGetTempSize( + batch_size, max_uncompressed_chunk_bytes, nvcompBatchedZstdDefaultOpts, &temp_size); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif default: CUDF_FAIL("Unsupported compression type"); } @@ -219,26 +238,36 @@ auto batched_compress_temp_size(compression_type compression, return temp_size; } -// Dispatcher for nvcompBatchedCompressGetMaxOutputChunkSize -size_t batched_compress_get_max_output_chunk_size(compression_type compression, - uint32_t max_uncompressed_chunk_bytes) +size_t compress_max_output_chunk_size(compression_type compression, + uint32_t max_uncompressed_chunk_bytes) { + auto const capped_uncomp_bytes = std::min( + compress_max_allowed_chunk_size(compression).value_or(max_uncompressed_chunk_bytes), + max_uncompressed_chunk_bytes); + size_t max_comp_chunk_size = 0; nvcompStatus_t status = nvcompStatus_t::nvcompSuccess; switch (compression) { case compression_type::SNAPPY: status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - max_uncompressed_chunk_bytes, nvcompBatchedSnappyDefaultOpts, &max_comp_chunk_size); + capped_uncomp_bytes, nvcompBatchedSnappyDefaultOpts, &max_comp_chunk_size); break; case compression_type::DEFLATE: #if NVCOMP_HAS_DEFLATE status = nvcompBatchedDeflateCompressGetMaxOutputChunkSize( - max_uncompressed_chunk_bytes, nvcompBatchedDeflateDefaultOpts, &max_comp_chunk_size); + capped_uncomp_bytes, nvcompBatchedDeflateDefaultOpts, &max_comp_chunk_size); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + status = nvcompBatchedZstdCompressGetMaxOutputChunkSize( + capped_uncomp_bytes, nvcompBatchedZstdDefaultOpts, &max_comp_chunk_size); break; #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; default: CUDF_FAIL("Unsupported compression type"); } @@ -289,26 +318,50 @@ static void batched_compress_async(compression_type compression, #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + nvcomp_status = nvcompBatchedZstdCompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedZstdDefaultOpts, + stream.value()); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif default: CUDF_FAIL("Unsupported compression type"); } CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in compression"); } +bool is_aligned(void const* ptr, std::uintptr_t alignment) noexcept +{ + return (reinterpret_cast(ptr) % alignment) == 0; +} + void batched_compress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, - uint32_t max_uncomp_chunk_size, + device_span results, rmm::cuda_stream_view stream) { auto const num_chunks = inputs.size(); + auto nvcomp_args = create_batched_nvcomp_args(inputs, outputs, stream); + + auto const max_uncomp_chunk_size = skip_unsupported_inputs( + nvcomp_args.input_data_sizes, results, compress_max_allowed_chunk_size(compression), stream); + auto const temp_size = batched_compress_temp_size(compression, num_chunks, max_uncomp_chunk_size); rmm::device_buffer scratch(temp_size, stream); + CUDF_EXPECTS(is_aligned(scratch.data(), 8), "Compression failed, misaligned scratch buffer"); rmm::device_uvector actual_compressed_data_sizes(num_chunks, stream); - auto const nvcomp_args = create_batched_nvcomp_args(inputs, outputs, stream); batched_compress_async(compression, nvcomp_args.input_data_ptrs.data(), @@ -321,7 +374,55 @@ void batched_compress(compression_type compression, actual_compressed_data_sizes.data(), stream.value()); - convert_status(std::nullopt, actual_compressed_data_sizes, statuses, stream); + update_compression_results(actual_compressed_data_sizes, results, stream); +} + +bool is_compression_enabled(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: + return NVCOMP_HAS_DEFLATE and detail::nvcomp_integration::is_all_enabled(); + case compression_type::SNAPPY: return detail::nvcomp_integration::is_stable_enabled(); + case compression_type::ZSTD: + return NVCOMP_HAS_ZSTD_COMP and detail::nvcomp_integration::is_all_enabled(); + default: return false; + } + return false; +} + +size_t compress_input_alignment_bits(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 0; + case compression_type::SNAPPY: return 0; + case compression_type::ZSTD: return 2; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +size_t compress_output_alignment_bits(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 3; + case compression_type::SNAPPY: return 0; + case compression_type::ZSTD: return 0; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +std::optional compress_max_allowed_chunk_size(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 64 * 1024; + case compression_type::SNAPPY: return std::nullopt; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + return nvcompZstdCompressionMaxAllowedChunkSize; +#else + CUDF_FAIL("Unsupported compression type"); +#endif + default: return std::nullopt; + } } } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu index 30551dc31cf..c3c1bff9073 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cu +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -57,31 +57,69 @@ batched_args create_batched_nvcomp_args(device_span c std::move(output_data_sizes)}; } -void convert_status(std::optional> nvcomp_stats, - device_span actual_uncompressed_sizes, - device_span cudf_stats, - rmm::cuda_stream_view stream) +void update_compression_results(device_span nvcomp_stats, + device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream) { - if (nvcomp_stats.has_value()) { - thrust::transform( + thrust::transform_if( + rmm::exec_policy(stream), + nvcomp_stats.begin(), + nvcomp_stats.end(), + actual_output_sizes.begin(), + results.begin(), + results.begin(), + [] __device__(auto const& nvcomp_status, auto const& size) { + return compression_result{size, + nvcomp_status == nvcompStatus_t::nvcompSuccess + ? compression_status::SUCCESS + : compression_status::FAILURE}; + }, + [] __device__(auto const& cudf_status) { + return cudf_status.status != compression_status::SKIPPED; + }); +} + +void update_compression_results(device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream) +{ + thrust::transform_if( + rmm::exec_policy(stream), + actual_output_sizes.begin(), + actual_output_sizes.end(), + results.begin(), + results.begin(), + [] __device__(auto const& size) { return compression_result{size}; }, + [] __device__(auto const& results) { return results.status != compression_status::SKIPPED; }); +} + +size_t skip_unsupported_inputs(device_span input_sizes, + device_span results, + std::optional max_valid_input_size, + rmm::cuda_stream_view stream) +{ + if (max_valid_input_size.has_value()) { + auto status_size_it = thrust::make_zip_iterator(input_sizes.begin(), results.begin()); + thrust::transform_if( rmm::exec_policy(stream), - nvcomp_stats->begin(), - nvcomp_stats->end(), - actual_uncompressed_sizes.begin(), - cudf_stats.begin(), - [] __device__(auto const& status, auto const& size) { - return decompress_status{size, status == nvcompStatus_t::nvcompSuccess ? 0u : 1u}; + results.begin(), + results.end(), + input_sizes.begin(), + status_size_it, + [] __device__(auto const& status) { + return thrust::pair{0, compression_result{0, compression_status::SKIPPED}}; + }, + [max_size = max_valid_input_size.value()] __device__(size_t input_size) { + return input_size > max_size; }); - } else { - thrust::transform(rmm::exec_policy(stream), - actual_uncompressed_sizes.begin(), - actual_uncompressed_sizes.end(), - cudf_stats.begin(), - [] __device__(size_t size) { - decompress_status status{}; - status.bytes_written = size; - return status; - }); } + + return thrust::reduce(rmm::exec_policy(stream), + input_sizes.begin(), + input_sizes.end(), + 0ul, + thrust::maximum()); } + } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cuh b/cpp/src/io/comp/nvcomp_adapter.cuh index 1cc65d41a51..e49a9a6d348 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cuh +++ b/cpp/src/io/comp/nvcomp_adapter.cuh @@ -48,10 +48,28 @@ batched_args create_batched_nvcomp_args(device_span c rmm::cuda_stream_view stream); /** - * @brief Convert nvcomp statuses into cuIO compression statuses. + * @brief Convert nvcomp statuses and output sizes into cuIO compression results. */ -void convert_status(std::optional> nvcomp_stats, - device_span actual_uncompressed_sizes, - device_span cudf_stats, - rmm::cuda_stream_view stream); +void update_compression_results(device_span nvcomp_stats, + device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Fill the result array based on the actual output sizes. + */ +void update_compression_results(device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Mark unsupported input chunks for skipping. + * + * Returns the size of the largest remaining input chunk. + */ +size_t skip_unsupported_inputs(device_span input_sizes, + device_span results, + std::optional max_valid_input_size, + rmm::cuda_stream_view stream); + } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index 40a85a3ac37..41af564ca76 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -18,6 +18,7 @@ #include "gpuinflate.hpp" +#include #include #include @@ -26,13 +27,23 @@ namespace cudf::io::nvcomp { enum class compression_type { SNAPPY, ZSTD, DEFLATE }; +/** + * @brief Whether the given compression type is enabled through nvCOMP. + * + * Result depends on nvCOMP version and environment variables. + * + * @param compression Compression type + * @returns true if nvCOMP use is enabled; false otherwise + */ +[[nodiscard]] bool is_compression_enabled(compression_type compression); + /** * @brief Device batch decompression of given type. * * @param[in] compression Compression type * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk * @param[in] max_total_uncomp_size maximum total size of uncompressed data * @param[in] stream CUDA stream to use @@ -40,7 +51,7 @@ enum class compression_type { SNAPPY, ZSTD, DEFLATE }; void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, size_t max_uncomp_chunk_size, size_t max_total_uncomp_size, rmm::cuda_stream_view stream); @@ -51,8 +62,32 @@ void batched_decompress(compression_type compression, * @param compression Compression type * @param max_uncomp_chunk_size Size of the largest uncompressed chunk in the batch */ -size_t batched_compress_get_max_output_chunk_size(compression_type compression, - uint32_t max_uncomp_chunk_size); +[[nodiscard]] size_t compress_max_output_chunk_size(compression_type compression, + uint32_t max_uncomp_chunk_size); + +/** + * @brief Gets input alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment, in bits + */ +[[nodiscard]] size_t compress_input_alignment_bits(compression_type compression); + +/** + * @brief Gets output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment, in bits + */ +[[nodiscard]] size_t compress_output_alignment_bits(compression_type compression); + +/** + * @brief Maximum size of uncompressed chunks that can be compressed with nvCOMP. + * + * @param compression Compression type + * @returns maximum chunk size + */ +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); /** * @brief Device batch compression of given type. @@ -60,15 +95,13 @@ size_t batched_compress_get_max_output_chunk_size(compression_type compression, * @param[in] compression Compression type * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures - * @param[in] max_uncomp_chunk_size Size of the largest uncompressed chunk in the batch + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void batched_compress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, - uint32_t max_uncomp_chunk_size, + device_span results, rmm::cuda_stream_view stream); } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 820a7f937d7..6c7ab490751 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -260,7 +260,7 @@ static __device__ uint32_t Match60(const uint8_t* src1, __global__ void __launch_bounds__(128) snap_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses) + device_span results) { __shared__ __align__(16) snap_state_s state_g; @@ -337,21 +337,22 @@ __global__ void __launch_bounds__(128) } __syncthreads(); if (!t) { - statuses[blockIdx.x].bytes_written = s->dst - s->dst_base; - statuses[blockIdx.x].status = (s->dst > s->end) ? 1 : 0; - statuses[blockIdx.x].reserved = 0; + results[blockIdx.x].bytes_written = s->dst - s->dst_base; + results[blockIdx.x].status = + (s->dst > s->end) ? compression_status::FAILURE : compression_status::SUCCESS; + results[blockIdx.x].reserved = 0; } } void gpu_snap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block dim3 dim_grid(inputs.size(), 1); if (inputs.size() > 0) { - snap_kernel<<>>(inputs, outputs, statuses); + snap_kernel<<>>(inputs, outputs, results); } } diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 6f33c9f1de9..8e58f86317c 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -520,7 +520,9 @@ size_t decompress_zstd(host_span src, hd_dsts[0] = d_dst; hd_dsts.host_to_device(stream); - auto hd_stats = hostdevice_vector(1, stream); + auto hd_stats = hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device(stream); auto const max_uncomp_page_size = dst.size(); nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, hd_srcs, @@ -531,7 +533,7 @@ size_t decompress_zstd(host_span src, stream); hd_stats.device_to_host(stream, true); - CUDF_EXPECTS(hd_stats[0].status == 0, "ZSTD decompression failed"); + CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` CUDF_CUDA_TRY(cudaMemcpyAsync( diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 98011a57ea8..8b13ddd1de4 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -627,7 +627,7 @@ template __global__ void __launch_bounds__(block_size) unsnap_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses) + device_span results) { __shared__ __align__(16) unsnap_state_s state_g; __shared__ cub::WarpReduce::TempStorage temp_storage; @@ -698,25 +698,26 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); } if (!t) { - statuses[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; - statuses[strm_id].status = s->error; + results[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; + results[strm_id].status = + (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; if (log_cyclecount) { - statuses[strm_id].reserved = clock() - s->tstart; + results[strm_id].reserved = clock() - s->tstart; } else { - statuses[strm_id].reserved = 0; + results[strm_id].reserved = 0; } } } void gpu_unsnap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block dim3 dim_grid(inputs.size(), 1); // TODO: Check max grid dimensions vs max expected count - unsnap_kernel<128><<>>(inputs, outputs, statuses); + unsnap_kernel<128><<>>(inputs, outputs, results); } } // namespace io diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 858f7682b11..a007750d264 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -38,12 +38,12 @@ namespace cudf { namespace io { namespace orc { struct PostScript { - uint64_t footerLength = 0; // the length of the footer section in bytes - CompressionKind compression = NONE; // the kind of generic compression used - uint32_t compressionBlockSize = 256 * 1024; // the maximum size of each compression chunk - std::vector version; // the version of the writer [major, minor] - uint64_t metadataLength = 0; // the length of the metadata section in bytes - std::string magic = ""; // the fixed string "ORC" + uint64_t footerLength = 0; // the length of the footer section in bytes + CompressionKind compression = NONE; // the kind of generic compression used + uint32_t compressionBlockSize{}; // the maximum size of each compression chunk + std::vector version; // the version of the writer [major, minor] + uint64_t metadataLength = 0; // the length of the metadata section in bytes + std::string magic = ""; // the fixed string "ORC" }; struct StripeInformation { diff --git a/cpp/src/io/orc/orc_common.hpp b/cpp/src/io/orc/orc_common.hpp index 29a4ad6ed78..c2898b362a6 100644 --- a/cpp/src/io/orc/orc_common.hpp +++ b/cpp/src/io/orc/orc_common.hpp @@ -24,13 +24,6 @@ namespace orc { static constexpr uint32_t block_header_size = 3; -constexpr uint32_t compressed_block_size(uint32_t compressed_data_size) -{ - return ((compressed_data_size + block_header_size + 0xFF) & ~0xFF); -} - -static constexpr uint32_t padded_block_header_size = compressed_block_size(0); - enum CompressionKind : uint8_t { NONE = 0, ZLIB = 1, diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 9de7dfffc0c..c7a7a423cf2 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -56,12 +56,12 @@ struct CompressedStreamInfo { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data uint8_t* uncompressed_data; // [in] base ptr to uncompressed stream data or NULL if not known yet - size_t compressed_data_size; // [in] compressed data size for this stream - device_span* dec_in_ctl; // [in] input buffer to decompress - device_span* dec_out_ctl; // [in] output buffer to decompress into - device_span decstatus; // [in] results of decompression - device_span* copy_in_ctl; // [out] input buffer to copy - device_span* copy_out_ctl; // [out] output buffer to copy to + size_t compressed_data_size; // [in] compressed data size for this stream + device_span* dec_in_ctl; // [in] input buffer to decompress + device_span* dec_out_ctl; // [in] output buffer to decompress into + device_span dec_res; // [in] results of decompression + device_span* copy_in_ctl; // [out] input buffer to copy + device_span* copy_out_ctl; // [out] output buffer to copy to uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) uint32_t num_uncompressed_blocks; // [in,out] number of entries in dec_in_ctl(in), number of @@ -348,11 +348,10 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] compression Type of compression * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression + * @param[in] comp_block_align Required alignment for compressed blocks * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] - * @param[out] comp_in Per-block compression input buffers - * @param[out] comp_out Per-block compression output buffers - * @param[out] comp_stat Per-block compression status + * @param[out] comp_res Per-block compression status * @param[in] stream CUDA stream used for device memory operations and kernel launches */ void CompressOrcDataStreams(uint8_t* compressed_data, @@ -360,11 +359,10 @@ void CompressOrcDataStreams(uint8_t* compressed_data, CompressionKind compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, + uint32_t comp_block_align, device_2dspan strm_desc, device_2dspan enc_streams, - device_span> comp_in, - device_span> comp_out, - device_span comp_stat, + device_span comp_res, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index c79aa5d7a4f..7ff3ee85939 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -262,26 +262,26 @@ auto decimal_column_type(std::vector const& decimal128_columns, } // namespace -__global__ void decompress_check_kernel(device_span stats, +__global__ void decompress_check_kernel(device_span results, bool* any_block_failure) { auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - if (stats[tid].status != 0) { + if (tid < results.size()) { + if (results[tid].status != compression_status::SUCCESS) { *any_block_failure = true; // Doesn't need to be atomic } } } -void decompress_check(device_span stats, +void decompress_check(device_span results, bool* any_block_failure, rmm::cuda_stream_view stream) { - if (stats.empty()) { return; } // early exit for empty stats + if (results.empty()) { return; } // early exit for empty results dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); - decompress_check_kernel<<>>(stats, any_block_failure); + dim3 grid(cudf::util::div_rounding_up_safe(results.size(), static_cast(block.x))); + decompress_check_kernel<<>>(results, any_block_failure); } rmm::device_buffer reader::impl::decompress_stripe_data( @@ -337,7 +337,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data( num_compressed_blocks + num_uncompressed_blocks, stream); rmm::device_uvector> inflate_out( num_compressed_blocks + num_uncompressed_blocks, stream); - rmm::device_uvector inflate_stats(num_compressed_blocks, stream); + rmm::device_uvector inflate_res(num_compressed_blocks, stream); + thrust::fill(rmm::exec_policy(stream), + inflate_res.begin(), + inflate_res.end(), + compression_result{0, compression_status::FAILURE}); // Parse again to populate the decompression input/output buffers size_t decomp_offset = 0; @@ -349,8 +353,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( compinfo[i].uncompressed_data = dst_base + decomp_offset; compinfo[i].dec_in_ctl = inflate_in.data() + start_pos; compinfo[i].dec_out_ctl = inflate_out.data() + start_pos; - compinfo[i].decstatus = {inflate_stats.data() + start_pos, compinfo[i].num_compressed_blocks}; - compinfo[i].copy_in_ctl = inflate_in.data() + start_pos_uncomp; + compinfo[i].dec_res = {inflate_res.data() + start_pos, compinfo[i].num_compressed_blocks}; + compinfo[i].copy_in_ctl = inflate_in.data() + start_pos_uncomp; compinfo[i].copy_out_ctl = inflate_out.data() + start_pos_uncomp; stream_info[i].dst_pos = decomp_offset; @@ -379,13 +383,13 @@ rmm::device_buffer reader::impl::decompress_stripe_data( nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); } else { gpuinflate( - inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); + inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream); } break; case compression_type::SNAPPY: @@ -393,26 +397,26 @@ rmm::device_buffer reader::impl::decompress_stripe_data( nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); } else { - gpu_unsnap(inflate_in_view, inflate_out_view, inflate_stats, stream); + gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream); } break; case compression_type::ZSTD: nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); break; default: CUDF_FAIL("Unexpected decompression dispatch"); break; } - decompress_check(inflate_stats, any_block_failure.device_ptr(), stream); + decompress_check(inflate_res, any_block_failure.device_ptr(), stream); } if (num_uncompressed_blocks > 0) { device_span> copy_in_view{inflate_in.data() + num_compressed_blocks, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 5e9a6f8df6b..b1c04099e64 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -17,14 +17,16 @@ #include "orc_common.hpp" #include "orc_gpu.hpp" -#include -#include -#include #include #include #include #include +#include +#include +#include +#include + #include #include #include @@ -1142,10 +1144,11 @@ __global__ void __launch_bounds__(1024) * @param[in] chunks EncChunk device array [rowgroup][column] * @param[out] inputs Per-block compression input buffers * @param[out] outputs Per-block compression output buffers - * @param[out] statuses Per-block compression status + * @param[out] results Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression + * @param[in] comp_block_align Required alignment for compressed blocks */ // blockDim {256,1,1} __global__ void __launch_bounds__(256) @@ -1153,14 +1156,18 @@ __global__ void __launch_bounds__(256) device_2dspan streams, // const? device_span> inputs, device_span> outputs, - device_span statuses, + device_span results, uint8_t* compressed_bfr, uint32_t comp_blk_size, - uint32_t max_comp_blk_size) + uint32_t max_comp_blk_size, + uint32_t comp_block_align) { __shared__ __align__(16) StripeStream ss; __shared__ uint8_t* volatile uncomp_base_g; + auto const padded_block_header_size = util::round_up_unsafe(block_header_size, comp_block_align); + auto const padded_comp_block_size = util::round_up_unsafe(max_comp_blk_size, comp_block_align); + auto const stripe_id = blockIdx.x; auto const stream_id = blockIdx.y; uint32_t t = threadIdx.x; @@ -1177,10 +1184,10 @@ __global__ void __launch_bounds__(256) num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 1; for (uint32_t b = t; b < num_blocks; b += 256) { uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size}; - auto const dst_offset = b * compressed_block_size(max_comp_blk_size) + padded_block_header_size; - outputs[ss.first_block + b] = {dst + dst_offset, max_comp_blk_size}; - statuses[ss.first_block + b] = {blk_size, 1, 0}; + inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size}; + auto const dst_offset = b * (padded_block_header_size + padded_comp_block_size); + outputs[ss.first_block + b] = {dst + dst_offset, max_comp_blk_size}; + results[ss.first_block + b] = {0, compression_status::FAILURE}; } } @@ -1190,9 +1197,9 @@ __global__ void __launch_bounds__(256) * * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[out] inputs Per-block compression input buffers + * @param[in] inputs Per-block compression input buffers * @param[out] outputs Per-block compression output buffers - * @param[out] statuses Per-block compression status + * @param[out] results Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression @@ -1202,7 +1209,7 @@ __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(device_2dspan strm_desc, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1228,16 +1235,16 @@ __global__ void __launch_bounds__(1024) if (t == 0) { auto const src_len = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - auto dst_len = (statuses[ss.first_block + b].status == 0) - ? statuses[ss.first_block + b].bytes_written + auto dst_len = (results[ss.first_block + b].status == compression_status::SUCCESS) + ? results[ss.first_block + b].bytes_written : src_len; uint32_t blk_size24{}; - if (statuses[ss.first_block + b].status == 0) { + if (results[ss.first_block + b].status == compression_status::SUCCESS) { // Copy from uncompressed source - src = inputs[ss.first_block + b].data(); - statuses[ss.first_block + b].bytes_written = src_len; - dst_len = src_len; - blk_size24 = dst_len * 2 + 1; + src = inputs[ss.first_block + b].data(); + results[ss.first_block + b].bytes_written = src_len; + dst_len = src_len; + blk_size24 = dst_len * 2 + 1; } else { // Compressed block src = outputs[ss.first_block + b].data(); @@ -1307,51 +1314,59 @@ void CompressOrcDataStreams(uint8_t* compressed_data, CompressionKind compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, + uint32_t comp_block_align, device_2dspan strm_desc, device_2dspan enc_streams, - device_span> comp_in, - device_span> comp_out, - device_span comp_stat, + device_span comp_res, rmm::cuda_stream_view stream) { + rmm::device_uvector> comp_in(num_compressed_blocks, stream); + rmm::device_uvector> comp_out(num_compressed_blocks, stream); + dim3 dim_block_init(256, 1); dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); gpuInitCompressionBlocks<<>>(strm_desc, enc_streams, comp_in, comp_out, - comp_stat, + comp_res, compressed_data, comp_blk_size, - max_comp_blk_size); + max_comp_blk_size, + comp_block_align); if (compression == SNAPPY) { try { - if (detail::nvcomp_integration::is_stable_enabled()) { + if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) { nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_stat, comp_blk_size, stream); + nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); } else { - gpu_snap(comp_in, comp_out, comp_stat, stream); + gpu_snap(comp_in, comp_out, comp_res, stream); } } catch (...) { // There was an error in compressing so set an error status for each block - thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(decompress_status & stat) { stat.status = 1; }); + thrust::for_each( + rmm::exec_policy(stream), + comp_res.begin(), + comp_res.end(), + [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); // Since SNAPPY is the default compression (may not be explicitly requested), fall back to // writing without compression } - } else if (compression == ZLIB and detail::nvcomp_integration::is_all_enabled()) { + } else if (compression == ZLIB and + nvcomp::is_compression_enabled(nvcomp::compression_type::DEFLATE)) { nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_stat, comp_blk_size, stream); + nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); + } else if (compression == ZSTD and + nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) { + nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); } else if (compression != NONE) { CUDF_FAIL("Unsupported compression type"); } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( - strm_desc, comp_in, comp_out, comp_stat, compressed_data, comp_blk_size, max_comp_blk_size); + strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); } } // namespace gpu diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index edae60bfa6d..bd65089810e 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -160,7 +160,7 @@ __global__ void __launch_bounds__(128, 8) const uint8_t* cur = s->info.compressed_data; const uint8_t* end = cur + s->info.compressed_data_size; auto dec_out = s->info.dec_out_ctl; - auto dec_status = s->info.decstatus; + auto dec_result = s->info.dec_res; uint8_t* uncompressed_actual = s->info.uncompressed_data; uint8_t* uncompressed_estimated = uncompressed_actual; uint32_t num_compressed_blocks = 0; @@ -178,13 +178,9 @@ __global__ void __launch_bounds__(128, 8) uncompressed_size_actual = block_len; } else { if (num_compressed_blocks > max_compressed_blocks) { break; } - if (shuffle((lane_id == 0) ? dec_status[num_compressed_blocks].status : 0) != 0) { - // Decompression failed, not much point in doing anything else - break; - } uint32_t const dst_size = dec_out[num_compressed_blocks].size(); uncompressed_size_est = shuffle((lane_id == 0) ? dst_size : 0); - uint32_t const bytes_written = dec_status[num_compressed_blocks].bytes_written; + uint32_t const bytes_written = dec_result[num_compressed_blocks].bytes_written; uncompressed_size_actual = shuffle((lane_id == 0) ? bytes_written : 0); } // In practice, this should never happen with a well-behaved writer, as we would expect the @@ -383,7 +379,7 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, const uint8_t* start = s->strm_info[ci_id].compressed_data; const uint8_t* cur = start; const uint8_t* end = cur + s->strm_info[ci_id].compressed_data_size; - auto decstatus = s->strm_info[ci_id].decstatus.data(); + auto dec_result = s->strm_info[ci_id].dec_res.data(); uint32_t uncomp_offset = 0; for (;;) { uint32_t block_len; @@ -400,8 +396,8 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, if (is_uncompressed) { uncomp_offset += block_len; } else { - uncomp_offset += decstatus->bytes_written; - decstatus++; + uncomp_offset += dec_result->bytes_written; + dec_result++; } } s->rowgroups[t].strm_offset[ci_id] += uncomp_offset; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 90858ac6fcc..a5e9e9da4cb 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -85,7 +85,18 @@ template using pinned_buffer = std::unique_ptr; /** - * @brief Function that translates GDF compression to ORC compression + * @brief Translates ORC compression to nvCOMP compression + */ +auto to_nvcomp_compression_type(CompressionKind compression_kind) +{ + if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; + if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; + if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; + CUDF_FAIL("Unsupported compression type"); +} + +/** + * @brief Translates cuDF compression to ORC compression */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -93,27 +104,30 @@ orc::CompressionKind to_orc_compression(compression_type compression) case compression_type::AUTO: case compression_type::SNAPPY: return orc::CompressionKind::SNAPPY; case compression_type::ZLIB: return orc::CompressionKind::ZLIB; + case compression_type::ZSTD: return orc::CompressionKind::ZSTD; case compression_type::NONE: return orc::CompressionKind::NONE; - default: CUDF_FAIL("Unsupported compression type"); return orc::CompressionKind::NONE; + default: CUDF_FAIL("Unsupported compression type"); } } /** * @brief Returns the block size for a given compression kind. - * - * The nvCOMP ZLIB compression is limited to blocks up to 64KiB. */ constexpr size_t compression_block_size(orc::CompressionKind compression) { - switch (compression) { - case orc::CompressionKind::NONE: return 0; - case orc::CompressionKind::ZLIB: return 64 * 1024; - default: return 256 * 1024; - } + if (compression == orc::CompressionKind::NONE) { return 0; } + + auto const ncomp_type = to_nvcomp_compression_type(compression); + auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type) + ? nvcomp::compress_max_allowed_chunk_size(ncomp_type) + : std::nullopt; + + constexpr size_t max_block_size = 256 * 1024; + return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); } /** - * @brief Function that translates GDF dtype to ORC datatype + * @brief Translates cuDF dtype to ORC datatype */ constexpr orc::TypeKind to_orc_type(cudf::type_id id, bool list_column_as_map) { @@ -520,6 +534,26 @@ constexpr size_t RLE_stream_size(TypeKind kind, size_t count) } } +auto uncomp_block_alignment(CompressionKind compression_kind) +{ + if (compression_kind == NONE or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) { + return 1u; + } + + return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(compression_kind)); +} + +auto comp_block_alignment(CompressionKind compression_kind) +{ + if (compression_kind == NONE or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) { + return 1u; + } + + return 1u << nvcomp::compress_output_alignment_bits(to_nvcomp_compression_type(compression_kind)); +} + orc_streams writer::impl::create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes) @@ -565,9 +599,13 @@ orc_streams writer::impl::create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - const auto base = column.index() * gpu::CI_NUM_STREAMS; - ids[base + index_type] = streams.size(); - streams.push_back(orc::Stream{kind, column.id(), size}); + auto const max_alignment_padding = uncomp_block_alignment(compression_kind_) - 1; + const auto base = column.index() * gpu::CI_NUM_STREAMS; + ids[base + index_type] = streams.size(); + streams.push_back(orc::Stream{ + kind, + column.id(), + (size == 0) ? 0 : size + max_alignment_padding * segmentation.num_rowgroups()}); types.push_back(type_kind); }; @@ -868,6 +906,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, encoder_decimal_info&& dec_chunk_sizes, file_segmentation const& segmentation, orc_streams const& streams, + uint32_t uncomp_block_align, rmm::cuda_stream_view stream) { auto const num_columns = orc_table.num_columns(); @@ -1020,10 +1059,16 @@ encoded_data encode_columns(orc_table_view const& orc_table, strm.lengths[strm_type] = 0; strm.data_ptrs[strm_type] = nullptr; } + auto const misalignment = + reinterpret_cast(strm.data_ptrs[strm_type]) % uncomp_block_align; + if (misalignment != 0) { + strm.data_ptrs[strm_type] += (uncomp_block_align - misalignment); + } } } } } + chunk_streams.host_to_device(stream); if (orc_table.num_rows() > 0) { @@ -1340,7 +1385,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_res, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, @@ -1365,17 +1410,17 @@ void writer::impl::write_index_stream(int32_t stripe_id, } return record; }; - auto scan_record = [=, &comp_out](gpu::encoder_chunk_streams const& stream, + auto scan_record = [=, &comp_res](gpu::encoder_chunk_streams const& stream, gpu::StreamIndexType type, row_group_index_info& record) { if (record.pos >= 0) { record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && (static_cast(record.pos) >= compression_blocksize_) && - (record.comp_pos + block_header_size + comp_out[record.blk_pos].bytes_written < + (record.comp_pos + block_header_size + comp_res[record.blk_pos].bytes_written < static_cast(record.comp_size))) { record.pos -= compression_blocksize_; - record.comp_pos += block_header_size + comp_out[record.blk_pos].bytes_written; + record.comp_pos += block_header_size + comp_res[record.blk_pos].bytes_written; record.blk_pos += 1; } } @@ -2007,20 +2052,12 @@ __global__ void copy_string_data(char* string_pool, } } -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - CUDF_FAIL("Unsupported compression type"); -} - -size_t get_compress_max_output_chunk_size(CompressionKind compression_kind, - uint32_t compression_blocksize) +size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) { if (compression_kind == NONE) return 0; - return batched_compress_get_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); + return compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), + compression_blocksize); } void writer::impl::persisted_statistics::persist(int num_table_rows, @@ -2124,10 +2161,16 @@ void writer::impl::write(table_view const& table) auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); + auto const uncomp_block_align = uncomp_block_alignment(compression_kind_); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes)); - auto enc_data = encode_columns( - orc_table, std::move(dictionaries), std::move(dec_chunk_sizes), segmentation, streams, stream); + auto enc_data = encode_columns(orc_table, + std::move(dictionaries), + std::move(dec_chunk_sizes), + segmentation, + streams, + uncomp_block_align, + stream); // Assemble individual disparate column chunks into contiguous data streams size_type const num_index_streams = (orc_table.num_columns() + 1); @@ -2140,8 +2183,13 @@ void writer::impl::write(table_view const& table) // Allocate intermediate output stream buffer size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; + auto const max_compressed_block_size = - get_compress_max_output_chunk_size(compression_kind_, compression_blocksize_); + max_compression_output_size(compression_kind_, compression_blocksize_); + auto const padded_max_compressed_block_size = + util::round_up_unsafe(max_compressed_block_size, uncomp_block_align); + auto const padded_block_header_size = + util::round_up_unsafe(block_header_size, uncomp_block_align); auto stream_output = [&]() { size_t max_stream_size = 0; @@ -2158,7 +2206,8 @@ void writer::impl::write(table_view const& table) (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); stream_size += num_blocks * block_header_size; num_compressed_blocks += num_blocks; - compressed_bfr_size += compressed_block_size(max_compressed_block_size) * num_blocks; + compressed_bfr_size += + (padded_block_header_size + padded_max_compressed_block_size) * num_blocks; } max_stream_size = std::max(max_stream_size, stream_size); } @@ -2177,9 +2226,11 @@ void writer::impl::write(table_view const& table) // Compress the data streams rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector> comp_in(num_compressed_blocks, stream); - hostdevice_vector> comp_out(num_compressed_blocks, stream); - hostdevice_vector comp_stats(num_compressed_blocks, stream); + hostdevice_vector comp_results(num_compressed_blocks, stream); + thrust::fill(rmm::exec_policy(stream), + comp_results.d_begin(), + comp_results.d_end(), + compression_result{0, compression_status::FAILURE}); if (compression_kind_ != NONE) { strm_descs.host_to_device(stream); gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), @@ -2187,14 +2238,13 @@ void writer::impl::write(table_view const& table) compression_kind_, compression_blocksize_, max_compressed_block_size, + comp_block_alignment(compression_kind_), strm_descs, enc_data.streams, - comp_in, - comp_out, - comp_stats, + comp_results, stream); strm_descs.device_to_host(stream); - comp_stats.device_to_host(stream, true); + comp_results.device_to_host(stream, true); } ProtobufWriter pbw_(&buffer_); @@ -2221,7 +2271,7 @@ void writer::impl::write(table_view const& table) segmentation, enc_data.streams, strm_descs, - comp_stats, + comp_results, intermediate_stats.rowgroup_blobs, &stripe, &streams, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index ed360a77632..dc8aad33af0 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -390,7 +390,7 @@ class writer::impl { file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index f06488671c3..77984ee3c27 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -228,7 +228,8 @@ __global__ void __launch_bounds__(128) statistics_merge_group* chunk_grstats, int32_t num_columns, size_t max_page_size_bytes, - size_type max_page_size_rows) + size_type max_page_size_rows, + uint32_t page_align) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach __shared__ __align__(8) parquet_column_device_view col_g; @@ -284,7 +285,8 @@ __global__ void __launch_bounds__(128) page_g.num_rows = ck_g.num_dict_entries; page_g.num_leaf_values = ck_g.num_dict_entries; page_g.num_values = ck_g.num_dict_entries; // TODO: shouldn't matter for dict page - page_offset += page_g.max_hdr_size + page_g.max_data_size; + page_offset += + util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); if (not comp_page_sizes.empty()) { comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page]; } @@ -360,7 +362,8 @@ __global__ void __launch_bounds__(128) } page_g.max_hdr_size += stats_hdr_len; } - page_g.page_data = ck_g.uncompressed_bfr + page_offset; + page_g.max_hdr_size = util::round_up_unsafe(page_g.max_hdr_size, page_align); + page_g.page_data = ck_g.uncompressed_bfr + page_offset; if (not comp_page_sizes.empty()) { page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset; } @@ -384,7 +387,8 @@ __global__ void __launch_bounds__(128) pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; - page_offset += page_g.max_hdr_size + page_g.max_data_size; + page_offset += + util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); if (not comp_page_sizes.empty()) { comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages]; } @@ -422,7 +426,7 @@ __global__ void __launch_bounds__(128) __syncwarp(); if (!t) { if (ck_g.ck_stat_size == 0 && ck_g.stats) { - uint32_t ck_stat_size = 48 + 2 * ck_max_stats_len; + uint32_t ck_stat_size = util::round_up_unsafe(48 + 2 * ck_max_stats_len, page_align); page_offset += ck_stat_size; comp_page_offset += ck_stat_size; ck_g.ck_stat_size = ck_stat_size; @@ -866,7 +870,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats) + device_span comp_results) { __shared__ __align__(8) page_enc_state_s state_g; using block_scan = cub::BlockScan; @@ -1213,18 +1217,17 @@ __global__ void __launch_bounds__(128, 8) } } if (t == 0) { - uint8_t* base = s->page.page_data + s->page.max_hdr_size; - auto actual_data_size = static_cast(s->cur - base); - uint32_t compressed_bfr_size = GetMaxCompressedBfrSize(actual_data_size); - s->page.max_data_size = actual_data_size; + uint8_t* base = s->page.page_data + s->page.max_hdr_size; + auto actual_data_size = static_cast(s->cur - base); + s->page.max_data_size = actual_data_size; if (not comp_in.empty()) { comp_in[blockIdx.x] = {base, actual_data_size}; - comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, compressed_bfr_size}; + comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, 0}; // size is unused } pages[blockIdx.x] = s->page; - if (not comp_stats.empty()) { - comp_stats[blockIdx.x] = {0, ~0u}; - pages[blockIdx.x].comp_stat = &comp_stats[blockIdx.x]; + if (not comp_results.empty()) { + comp_results[blockIdx.x] = {0, compression_status::FAILURE}; + pages[blockIdx.x].comp_res = &comp_results[blockIdx.x]; } } } @@ -1257,10 +1260,10 @@ __global__ void __launch_bounds__(128) gpuDecideCompression(device_spanbytes_written; - if (comp_status->status != 0) { atomicAdd(&error_count, 1); } + compressed_data_size += comp_res->bytes_written; + if (comp_res->status != compression_status::SUCCESS) { atomicAdd(&error_count, 1); } } } uncompressed_data_size = warp_reduce(temp_storage[0]).Sum(uncompressed_data_size); @@ -1677,7 +1680,7 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start, // blockDim(128, 1, 1) __global__ void __launch_bounds__(128) gpuEncodePageHeaders(device_span pages, - device_span comp_stat, + device_span comp_results, device_span page_stats, const statistics_chunk* chunk_stats) { @@ -1706,7 +1709,7 @@ __global__ void __launch_bounds__(128) uncompressed_page_size = page_g.max_data_size; if (ck_g.is_compressed) { hdr_start = page_g.compressed_data; - compressed_page_size = (uint32_t)comp_stat[blockIdx.x].bytes_written; + compressed_page_size = (uint32_t)comp_results[blockIdx.x].bytes_written; page_g.max_data_size = compressed_page_size; } else { hdr_start = page_g.page_data; @@ -2041,6 +2044,7 @@ void InitEncoderPages(device_2dspan chunks, int32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + uint32_t page_align, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream) @@ -2056,19 +2060,21 @@ void InitEncoderPages(device_2dspan chunks, chunk_grstats, num_columns, max_page_size_bytes, - max_page_size_rows); + max_page_size_rows, + page_align); } void EncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats, + device_span comp_results, rmm::cuda_stream_view stream) { auto num_pages = pages.size(); // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<128><<>>(pages, comp_in, comp_out, comp_stats); + gpuEncodePages<128> + <<>>(pages, comp_in, comp_out, comp_results); } void DecideCompression(device_span chunks, rmm::cuda_stream_view stream) @@ -2077,7 +2083,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view } void EncodePageHeaders(device_span pages, - device_span comp_stats, + device_span comp_results, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream) @@ -2085,7 +2091,7 @@ void EncodePageHeaders(device_span pages, // TODO: single thread task. No need for 128 threads/block. Earlier it used to employ rest of the // threads to coop load structs gpuEncodePageHeaders<<>>( - pages, comp_stats, page_stats, chunk_stats); + pages, comp_results, page_stats, chunk_stats); } void GatherPages(device_span chunks, diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 610275ee26b..d0d367df962 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -322,15 +322,6 @@ inline size_type __device__ row_to_value_idx(size_type idx, return idx; } -/** - * @brief Return worst-case compressed size of compressed data given the uncompressed size - */ -inline size_t __device__ __host__ GetMaxCompressedBfrSize(size_t uncomp_size, - uint32_t num_pages = 1) -{ - return uncomp_size + (uncomp_size >> 7) + num_pages * 8; -} - struct EncPage; /** @@ -389,7 +380,7 @@ struct EncPage { uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in //!< non-leaf levels - decompress_status* comp_stat; //!< Ptr to compression status + compression_result* comp_res; //!< Ptr to compression result }; /** @@ -544,6 +535,7 @@ void get_dictionary_indices(cudf::detail::device_2dspan * @param[in] num_rowgroups Number of fragments per column * @param[in] num_columns Number of columns * @param[in] page_grstats Setup for page-level stats + * @param[in] page_align Required alignment for uncompressed pages * @param[in] chunk_grstats Setup for chunk-level stats * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 @@ -556,6 +548,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, int32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + uint32_t page_align, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream); @@ -566,13 +559,13 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, * @param[in,out] pages Device array of EncPages (unordered) * @param[out] comp_in Compressor input buffers * @param[out] comp_in Compressor output buffers - * @param[out] comp_stats Compressor statuses + * @param[out] comp_stats Compressor results * @param[in] stream CUDA stream to use, default 0 */ void EncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats, + device_span comp_res, rmm::cuda_stream_view stream); /** @@ -593,7 +586,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @param[in] stream CUDA stream to use, default 0 */ void EncodePageHeaders(device_span pages, - device_span comp_stats, + device_span comp_res, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 2553b375e72..59bef6f5600 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -247,13 +247,15 @@ std::tuple conversion_info(type_id column_type_id, return std::make_tuple(type_width, clock_rate, converted_type); } -inline void decompress_check(device_span stats, +inline void decompress_check(device_span results, rmm::cuda_stream_view stream) { CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream), - stats.begin(), - stats.end(), - [] __device__(auto const& stat) { return stat.status == 0; }), + results.begin(), + results.end(), + [] __device__(auto const& res) { + return res.status == compression_status::SUCCESS; + }), "Error during decompression"); } } // namespace @@ -1149,11 +1151,11 @@ rmm::device_buffer reader::impl::decompress_page_data( std::vector> comp_out; comp_out.reserve(num_comp_pages); - rmm::device_uvector comp_stats(num_comp_pages, _stream); + rmm::device_uvector comp_res(num_comp_pages, _stream); thrust::fill(rmm::exec_policy(_stream), - comp_stats.begin(), - comp_stats.end(), - decompress_status{0, static_cast(-1000), 0}); + comp_res.begin(), + comp_res.end(), + compression_result{0, compression_status::FAILURE}); size_t decomp_offset = 0; int32_t start_pos = 0; @@ -1177,31 +1179,30 @@ rmm::device_buffer reader::impl::decompress_page_data( host_span const> comp_out_view(comp_out.data() + start_pos, codec.num_pages); auto const d_comp_out = cudf::detail::make_device_uvector_async(comp_out_view, _stream); - device_span d_comp_stats_view(comp_stats.data() + start_pos, - codec.num_pages); + device_span d_comp_res_view(comp_res.data() + start_pos, codec.num_pages); switch (codec.compression_type) { case parquet::GZIP: - gpuinflate(d_comp_in, d_comp_out, d_comp_stats_view, gzip_header_included::YES, _stream); + gpuinflate(d_comp_in, d_comp_out, d_comp_res_view, gzip_header_included::YES, _stream); break; case parquet::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, _stream); } else { - gpu_unsnap(d_comp_in, d_comp_out, d_comp_stats_view, _stream); + gpu_unsnap(d_comp_in, d_comp_out, d_comp_res_view, _stream); } break; case parquet::ZSTD: nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, _stream); @@ -1209,7 +1210,7 @@ rmm::device_buffer reader::impl::decompress_page_data( case parquet::BROTLI: gpu_debrotli(d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, debrotli_scratch.data(), debrotli_scratch.size(), _stream); @@ -1219,7 +1220,7 @@ rmm::device_buffer reader::impl::decompress_page_data( start_pos += codec.num_pages; } - decompress_check(comp_stats, _stream); + decompress_check(comp_res, _stream); // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 58910420173..2bfd7c1ba4d 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -24,6 +24,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" +#include #include #include #include @@ -43,8 +44,6 @@ #include #include -#include - #include #include #include @@ -79,6 +78,7 @@ parquet::Compression to_parquet_compression(compression_type compression) switch (compression) { case compression_type::AUTO: case compression_type::SNAPPY: return parquet::Compression::SNAPPY; + case compression_type::ZSTD: return parquet::Compression::ZSTD; case compression_type::NONE: return parquet::Compression::UNCOMPRESSED; default: CUDF_FAIL("Unsupported compression type"); } @@ -907,11 +907,36 @@ void writer::impl::gather_fragment_statistics( stream.synchronize(); } +auto to_nvcomp_compression_type(Compression codec) +{ + if (codec == Compression::SNAPPY) return nvcomp::compression_type::SNAPPY; + if (codec == Compression::ZSTD) return nvcomp::compression_type::ZSTD; + CUDF_FAIL("Unsupported compression type"); +} + +auto page_alignment(Compression codec) +{ + if (codec == Compression::UNCOMPRESSED or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(codec))) { + return 1u; + } + + return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(codec)); +} + +size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) +{ + if (codec == Compression::UNCOMPRESSED) return 0; + + return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); +} + auto init_page_sizes(hostdevice_2dvector& chunks, device_span col_desc, uint32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + Compression compression_codec, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return hostdevice_vector{}; } @@ -926,6 +951,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -949,6 +975,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -956,12 +983,12 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [](auto page_size) { - size_t page_comp_max_size = 0; - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - page_size, nvcompBatchedSnappyDefaultOpts, &page_comp_max_size); - return page_comp_max_size; - }); + std::transform(page_sizes.begin(), + page_sizes.end(), + comp_page_sizes.begin(), + [compression_codec](auto page_size) { + return max_compression_output_size(compression_codec, page_size); + }); comp_page_sizes.host_to_device(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -973,6 +1000,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -1091,6 +1119,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_), (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, stream); @@ -1109,83 +1138,6 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span const> comp_in, - device_span const> comp_out, - device_span comp_stats, - size_t max_page_uncomp_data_size, - rmm::cuda_stream_view stream) -{ - size_t num_comp_pages = comp_in.size(); - try { - size_t temp_size; - nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( - num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, - "Error in getting snappy compression scratch size"); - - // Not needed now but nvcomp API makes no promises about future - rmm::device_buffer scratch(temp_size, stream); - // Analogous to comp_in.srcDevice - rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.srcSize - rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); - // Analogous to comp_in.dstDevice - rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_stat.bytes_written - rmm::device_uvector compressed_bytes_written(num_comp_pages, stream); - // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in - // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - - // Prepare the vectors - auto comp_it = - thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin()); - thrust::transform( - rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(auto const& in) { return thrust::make_tuple(in.data(), in.size()); }); - - thrust::transform(rmm::exec_policy(stream), - comp_out.begin(), - comp_out.end(), - compressed_data_ptrs.begin(), - [] __device__(auto const& out) { return out.data(); }); - nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - max_page_uncomp_data_size, - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data(), - nvcompBatchedSnappyDefaultOpts, - stream.value()); - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); - - // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, - // compression will succeed. - // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. - thrust::transform(rmm::exec_policy(stream), - compressed_bytes_written.begin(), - compressed_bytes_written.end(), - comp_stats.begin(), - [] __device__(size_t size) { - decompress_status status{}; - status.bytes_written = size; - return status; - }); - return; - } catch (...) { - // If we reach this then there was an error in compressing so set an error status for each page - thrust::for_each(rmm::exec_policy(stream), - comp_stats.begin(), - comp_stats.end(), - [] __device__(decompress_status & stat) { stat.status = 1; }); - }; -} - void writer::impl::encode_pages(hostdevice_2dvector& chunks, device_span pages, size_t max_page_uncomp_data_size, @@ -1209,24 +1161,37 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); - rmm::device_uvector comp_stats(max_comp_pages, stream); + rmm::device_uvector comp_res(max_comp_pages, stream); + thrust::fill(rmm::exec_policy(stream), + comp_res.begin(), + comp_res.end(), + compression_result{0, compression_status::FAILURE}); - gpu::EncodePages(batch_pages, comp_in, comp_out, comp_stats, stream); + gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream); switch (compression_) { case parquet::Compression::SNAPPY: - if (nvcomp_integration::is_stable_enabled()) { - snappy_compress(comp_in, comp_out, comp_stats, max_page_uncomp_data_size, stream); + if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) { + nvcomp::batched_compress( + nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); } else { - gpu_snap(comp_in, comp_out, comp_stats, stream); + gpu_snap(comp_in, comp_out, comp_res, stream); + } + break; + case parquet::Compression::ZSTD: + if (nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) { + nvcomp::batched_compress( + nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); } break; - default: break; + case parquet::Compression::UNCOMPRESSED: break; + default: CUDF_FAIL("invalid compression type"); } + // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch); DecideCompression(d_chunks_in_batch.flat_view(), stream); - EncodePageHeaders(batch_pages, comp_stats, batch_pages_stats, chunk_stats, stream); + EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream); GatherPages(d_chunks_in_batch.flat_view(), pages, stream); if (column_stats != nullptr) { @@ -1274,6 +1239,18 @@ size_t writer::impl::column_index_buffer_size(gpu::EncColumnChunk* ck) const return ck->ck_stat_size * ck->num_pages + column_index_truncate_length + padding; } +size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +{ + if (compression == parquet::Compression::UNCOMPRESSED) { return max_page_size_bytes; } + + auto const ncomp_type = to_nvcomp_compression_type(compression); + auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type) + ? nvcomp::compress_max_allowed_chunk_size(ncomp_type) + : std::nullopt; + + return std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); +} + writer::impl::impl(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, @@ -1281,11 +1258,11 @@ writer::impl::impl(std::vector> sinks, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + compression_(to_parquet_compression(options.get_compression())), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, - max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_bytes(max_page_bytes(compression_, options.get_max_page_size_bytes())), max_page_size_rows(options.get_max_page_size_rows()), - compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), column_index_truncate_length(options.get_column_index_truncate_length()), @@ -1306,11 +1283,11 @@ writer::impl::impl(std::vector> sinks, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + compression_(to_parquet_compression(options.get_compression())), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, - max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_bytes(max_page_bytes(compression_, options.get_max_page_size_bytes())), max_page_size_rows(options.get_max_page_size_rows()), - compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), column_index_truncate_length(options.get_column_index_truncate_length()), @@ -1405,13 +1382,15 @@ void writer::impl::write(table_view const& table, std::vector co // iteratively reduce this value if the largest fragment exceeds the max page size limit (we // ideally want the page size to be below 1MB so as to have enough pages to get good // compression/decompression performance). - using cudf::io::parquet::gpu::max_page_fragment_size; + auto max_page_fragment_size = + (cudf::io::parquet::gpu::max_page_fragment_size * max_page_size_bytes) / + default_max_page_size_bytes; std::vector num_frag_in_part; std::transform(partitions.begin(), partitions.end(), std::back_inserter(num_frag_in_part), - [](auto const& part) { + [max_page_fragment_size](auto const& part) { return util::div_rounding_up_unsafe(part.num_rows, max_page_fragment_size); }); @@ -1561,8 +1540,8 @@ void writer::impl::write(table_view const& table, std::vector co } // Build chunk dictionaries and count pages - hostdevice_vector comp_page_sizes = - init_page_sizes(chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, stream); + hostdevice_vector comp_page_sizes = init_page_sizes( + chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, compression_, stream); // Get the maximum page size across all chunks size_type max_page_uncomp_data_size = diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c6309488d6b..cac75a5dcd9 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -208,11 +208,11 @@ class writer::impl { // Cuda stream to be used rmm::cuda_stream_view stream; + Compression compression_ = Compression::UNCOMPRESSED; size_t max_row_group_size = default_row_group_size_bytes; size_type max_row_group_rows = default_row_group_size_rows; size_t max_page_size_bytes = default_max_page_size_bytes; size_type max_page_size_rows = default_max_page_size_rows; - Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; size_type column_index_truncate_length = default_column_index_truncate_length; diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index 134f262cb13..c51a7854e25 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -58,7 +58,7 @@ struct DecompressTest : public cudf::test::BaseFixture { inf_out[0] = dst; inf_out.host_to_device(stream); - hostdevice_vector inf_stat(1, stream); + hostdevice_vector inf_stat(1, stream); inf_stat[0] = {}; inf_stat.host_to_device(stream); @@ -66,7 +66,7 @@ struct DecompressTest : public cudf::test::BaseFixture { cudaMemcpyAsync( decompressed->data(), dst.data(), dst.size(), cudaMemcpyDeviceToHost, stream.value()); inf_stat.device_to_host(stream, true); - ASSERT_EQ(inf_stat[0].status, 0); + ASSERT_EQ(inf_stat[0].status, cudf::io::compression_status::SUCCESS); } }; @@ -76,7 +76,7 @@ struct DecompressTest : public cudf::test::BaseFixture { struct GzipDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { cudf::io::gpuinflate(d_inf_in, d_inf_out, @@ -92,7 +92,7 @@ struct GzipDecompressTest : public DecompressTest { struct SnappyDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { cudf::io::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::default_stream_value); } @@ -104,7 +104,7 @@ struct SnappyDecompressTest : public DecompressTest { struct BrotliDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), cudf::default_stream_value}; diff --git a/java/src/main/java/ai/rapids/cudf/CompressionType.java b/java/src/main/java/ai/rapids/cudf/CompressionType.java index 48f980d7f71..96edf1a8add 100644 --- a/java/src/main/java/ai/rapids/cudf/CompressionType.java +++ b/java/src/main/java/ai/rapids/cudf/CompressionType.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,11 +44,21 @@ public enum CompressionType { ZIP(6), /** XZ format using LZMA(2) algorithm */ - XZ(7); + XZ(7), + + /** ZLIB format, using DEFLATE algorithm */ + ZLIB(8), + + /** LZ4 format, using LZ77 */ + LZ4(9), + + /** Lempel–Ziv–Oberhumer format */ + LZO(10), + + /** Zstandard format */ + ZSTD(11); final int nativeId; - CompressionType(int nativeId) { - this.nativeId = nativeId; - } + CompressionType(int nativeId) { this.nativeId = nativeId; } } diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 66b841fd273..1c9f388873c 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -163,6 +163,8 @@ cdef compression_type _get_comp_type(object compression): return compression_type.SNAPPY elif compression == "ZLIB": return compression_type.ZLIB + elif compression == "ZSTD": + return compression_type.ZSTD else: raise ValueError(f"Unsupported `compression` type {compression}") diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 84e0bba7133..3c8e78bd87a 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -670,6 +670,8 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): return cudf_io_types.compression_type.NONE elif compression == "snappy": return cudf_io_types.compression_type.SNAPPY + elif compression == "ZSTD": + return cudf_io_types.compression_type.ZSTD else: raise ValueError("Unsupported `compression` type") diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index db52e51bd33..c2188003531 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1745,19 +1745,18 @@ def test_writer_protobuf_large_rowindexentry(): assert_frame_equal(df, got) -def test_orc_writer_zlib_compression(list_struct_buff): +@pytest.mark.parametrize("compression", ["ZLIB", "ZSTD"]) +def test_orc_writer_nvcomp(list_struct_buff, compression): expected = cudf.read_orc(list_struct_buff) + + buff = BytesIO() try: - # save with ZLIB compression - buff = BytesIO() - expected.to_orc(buff, compression="ZLIB") - got = cudf.read_orc(buff) + expected.to_orc(buff, compression=compression) + except RuntimeError: + pytest.mark.xfail(reason="Newer nvCOMP version is required") + else: + got = pd.read_orc(buff) assert_eq(expected, got) - except RuntimeError as e: - if "Unsupported compression type" in str(e): - pytest.mark.xfail(reason="nvcomp build doesn't have deflate") - else: - raise e @pytest.mark.parametrize("index", [True, False, None]) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 84d89618909..022f7cdd6f7 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2568,3 +2568,23 @@ def test_parquet_nested_struct_list(): actual = cudf.read_parquet(buffer) assert_eq(expected, actual) assert_eq(actual.a.dtype, df.a.dtype) + + +def test_parquet_writer_zstd(): + size = 12345 + expected = cudf.DataFrame( + { + "a": np.arange(0, stop=size, dtype="float64"), + "b": np.random.choice(list("abcd"), size=size), + "c": np.random.choice(np.arange(4), size=size), + } + ) + + buff = BytesIO() + try: + expected.to_orc(buff, compression="ZSTD") + except RuntimeError: + pytest.mark.xfail(reason="Newer nvCOMP version is required") + else: + got = pd.read_orc(buff) + assert_eq(expected, got) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index a3bb81c6c24..19815c7c506 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -208,7 +208,7 @@ File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. Use list of str with partition_offsets to write parts of the dataframe to different files. -compression : {'snappy', None}, default 'snappy' +compression : {'snappy', 'ZSTD', None}, default 'snappy' Name of the compression to use. Use ``None`` for no compression. index : bool, default None If ``True``, include the dataframe's index(es) in the file output. If @@ -429,7 +429,7 @@ ---------- fname : str File path or object where the ORC dataset will be stored. -compression : {{ 'snappy', 'ZLIB', None }}, default None +compression : {{ 'snappy', 'ZLIB', 'ZSTD', None }}, default None Name of the compression to use. Use None for no compression. enable_statistics: boolean, default True Enable writing column statistics.