diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index e5b73dc9360..7fcdf1bf29a 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -164,7 +164,11 @@ rmm::device_buffer decompress_data(datasource& source, if (meta.codec == "deflate") { auto inflate_in = hostdevice_vector>(meta.block_list.size(), stream); auto inflate_out = hostdevice_vector>(meta.block_list.size(), stream); - auto inflate_stats = hostdevice_vector(meta.block_list.size(), stream); + auto inflate_stats = hostdevice_vector(meta.block_list.size(), stream); + thrust::fill(rmm::exec_policy(stream), + inflate_stats.d_begin(), + inflate_stats.d_end(), + compression_result{0, compression_status::FAILURE}); // Guess an initial maximum uncompressed block size. We estimate the compression factor is two // and round up to the next multiple of 4096 bytes. @@ -190,8 +194,6 @@ rmm::device_buffer decompress_data(datasource& source, for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { inflate_out.host_to_device(stream); - CUDF_CUDA_TRY(cudaMemsetAsync( - inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value())); gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream); inflate_stats.device_to_host(stream, true); @@ -204,9 +206,9 @@ rmm::device_buffer decompress_data(datasource& source, inflate_stats.begin(), std::back_inserter(actual_uncomp_sizes), [](auto const& inf_out, auto const& inf_stats) { - // If error status is 1 (buffer too small), the `bytes_written` field + // If error status is OUTPUT_OVERFLOW, the `bytes_written` field // actually contains the uncompressed data size - return inf_stats.status == 1 + return inf_stats.status == compression_status::OUTPUT_OVERFLOW ? std::max(inf_out.size(), inf_stats.bytes_written) : inf_out.size(); }); diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 07dc2cc9870..b6f2d2db811 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -1906,7 +1906,7 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction * * @param[in] inputs Source buffer per block * @param[out] outputs Destination buffer per block - * @param[out] statuses Decompressor status per block + * @param[out] results Decompressor status per block * @param scratch Intermediate device memory heap space (will be dynamically shared between blocks) * @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between * blocks) @@ -1914,7 +1914,7 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction __global__ void __launch_bounds__(block_size, 2) gpu_debrotli_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, uint8_t* scratch, uint32_t scratch_size) { @@ -2016,10 +2016,11 @@ __global__ void __launch_bounds__(block_size, 2) __syncthreads(); // Output decompression status if (!t) { - statuses[block_id].bytes_written = s->out - s->outbase; - statuses[block_id].status = s->error; + results[block_id].bytes_written = s->out - s->outbase; + results[block_id].status = + (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; // Return ext heap used by last block (statistics) - statuses[block_id].reserved = s->fb_size; + results[block_id].reserved = s->fb_size; } } @@ -2079,7 +2080,7 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) void gpu_debrotli(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, void* scratch, size_t scratch_size, rmm::cuda_stream_view stream) @@ -2104,7 +2105,7 @@ void gpu_debrotli(device_span const> inputs, cudaMemcpyHostToDevice, stream.value())); gpu_debrotli_kernel<<>>( - inputs, outputs, statuses, scratch_u8, fb_heap_size); + inputs, outputs, results, scratch_u8, fb_heap_size); #if DUMP_FB_HEAP uint32_t dump[2]; uint32_t cur = 0; diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 16f4ea84f7f..dacc5a00d16 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -1020,14 +1020,14 @@ __device__ int parse_gzip_header(const uint8_t* src, size_t src_size) * @tparam block_size Thread block dimension for this call * @param inputs Source and destination buffer information per block * @param outputs Destination buffer information per block - * @param statuses Decompression status buffer per block + * @param results Decompression status buffer per block * @param parse_hdr If nonzero, indicates that the compressed bitstream includes a GZIP header */ template __global__ void __launch_bounds__(block_size) inflate_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr) { __shared__ __align__(16) inflate_state_s state_g; @@ -1133,9 +1133,15 @@ __global__ void __launch_bounds__(block_size) // Output buffer too small state->err = 1; } - statuses[z].bytes_written = state->out - state->outbase; - statuses[z].status = state->err; - statuses[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes + results[z].bytes_written = state->out - state->outbase; + results[z].status = [&]() { + switch (state->err) { + case 0: return compression_status::SUCCESS; + case 1: return compression_status::OUTPUT_OVERFLOW; + default: return compression_status::FAILURE; + } + }(); + results[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes } } @@ -1200,14 +1206,14 @@ __global__ void __launch_bounds__(1024) void gpuinflate(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr, rmm::cuda_stream_view stream) { constexpr int block_size = 128; // Threads per block if (inputs.size() > 0) { inflate_kernel - <<>>(inputs, outputs, statuses, parse_hdr); + <<>>(inputs, outputs, results, parse_hdr); } } diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 3870b2ac3b3..1b45a31b13b 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -26,11 +26,21 @@ namespace cudf { namespace io { /** - * @brief Output parameters for the decompression interface + * @brief Status of a compression/decompression operation. */ -struct decompress_status { +enum class compression_status : uint8_t { + SUCCESS, ///< Successful, output is valid + FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) + SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) + OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output +}; + +/** + * @brief Descriptor of compression/decompression result. + */ +struct compression_result { uint64_t bytes_written; - uint32_t status; + compression_status status; uint32_t reserved; }; @@ -44,13 +54,13 @@ enum class gzip_header_included { NO, YES }; * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] parse_hdr Whether or not to parse GZIP header * @param[in] stream CUDA stream to use */ void gpuinflate(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, gzip_header_included parse_hdr, rmm::cuda_stream_view stream); @@ -73,12 +83,12 @@ void gpu_copy_uncompressed_blocks(device_span const> * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void gpu_unsnap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream); /** @@ -98,14 +108,14 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0); * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] scratch Temporary memory for intermediate work * @param[in] scratch_size Size in bytes of the temporary memory * @param[in] stream CUDA stream to use */ void gpu_debrotli(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, void* scratch, size_t scratch_size, rmm::cuda_stream_view stream); @@ -118,12 +128,12 @@ void gpu_debrotli(device_span const> inputs, * * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void gpu_snap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream); } // namespace io diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index 91deda50cf2..31f7b9b472e 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -21,17 +21,29 @@ #include +#define NVCOMP_DEFLATE_HEADER +#if __has_include(NVCOMP_DEFLATE_HEADER) +#include NVCOMP_DEFLATE_HEADER +#endif + #define NVCOMP_ZSTD_HEADER #if __has_include(NVCOMP_ZSTD_HEADER) #include NVCOMP_ZSTD_HEADER -#define NVCOMP_HAS_ZSTD 1 +#endif + +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 3) +#define NVCOMP_HAS_ZSTD_DECOMP 1 #else -#define NVCOMP_HAS_ZSTD 0 +#define NVCOMP_HAS_ZSTD_DECOMP 0 #endif -#define NVCOMP_DEFLATE_HEADER -#if __has_include(NVCOMP_DEFLATE_HEADER) -#include NVCOMP_DEFLATE_HEADER +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 4) +#define NVCOMP_HAS_ZSTD_COMP 1 +#else +#define NVCOMP_HAS_ZSTD_COMP 0 +#endif + +#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION >= 3) #define NVCOMP_HAS_DEFLATE 1 #else #define NVCOMP_HAS_DEFLATE 0 @@ -63,7 +75,7 @@ nvcompStatus_t batched_decompress_get_temp_size_ex(compression_type compression, case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSizeEx(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressGetTempSizeEx(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -83,7 +95,7 @@ auto batched_decompress_get_temp_size(compression_type compression, Args&&... ar case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressGetTempSize(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -106,7 +118,7 @@ auto batched_decompress_async(compression_type compression, Args&&... args) case compression_type::SNAPPY: return nvcompBatchedSnappyDecompressAsync(std::forward(args)...); case compression_type::ZSTD: -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP return nvcompBatchedZstdDecompressAsync(std::forward(args)...); #else CUDF_FAIL("Unsupported compression type"); @@ -146,14 +158,14 @@ size_t batched_decompress_temp_size(compression_type compression, void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, size_t max_uncomp_chunk_size, size_t max_total_uncomp_size, rmm::cuda_stream_view stream) { // TODO Consolidate config use to a common location if (compression == compression_type::ZSTD) { -#if NVCOMP_HAS_ZSTD +#if NVCOMP_HAS_ZSTD_DECOMP #if NVCOMP_ZSTD_IS_EXPERIMENTAL CUDF_EXPECTS(cudf::io::detail::nvcomp_integration::is_all_enabled(), "Zstandard compression is experimental, you can enable it through " @@ -187,7 +199,7 @@ void batched_decompress(compression_type compression, stream.value()); CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "unable to perform decompression"); - convert_status(nvcomp_statuses, actual_uncompressed_data_sizes, statuses, stream); + update_compression_results(nvcomp_statuses, actual_uncompressed_data_sizes, results, stream); } // Dispatcher for nvcompBatchedCompressGetTempSize @@ -210,7 +222,14 @@ auto batched_compress_temp_size(compression_type compression, #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + nvcomp_status = nvcompBatchedZstdCompressGetTempSize( + batch_size, max_uncompressed_chunk_bytes, nvcompBatchedZstdDefaultOpts, &temp_size); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif default: CUDF_FAIL("Unsupported compression type"); } @@ -219,26 +238,36 @@ auto batched_compress_temp_size(compression_type compression, return temp_size; } -// Dispatcher for nvcompBatchedCompressGetMaxOutputChunkSize -size_t batched_compress_get_max_output_chunk_size(compression_type compression, - uint32_t max_uncompressed_chunk_bytes) +size_t compress_max_output_chunk_size(compression_type compression, + uint32_t max_uncompressed_chunk_bytes) { + auto const capped_uncomp_bytes = std::min( + compress_max_allowed_chunk_size(compression).value_or(max_uncompressed_chunk_bytes), + max_uncompressed_chunk_bytes); + size_t max_comp_chunk_size = 0; nvcompStatus_t status = nvcompStatus_t::nvcompSuccess; switch (compression) { case compression_type::SNAPPY: status = nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - max_uncompressed_chunk_bytes, nvcompBatchedSnappyDefaultOpts, &max_comp_chunk_size); + capped_uncomp_bytes, nvcompBatchedSnappyDefaultOpts, &max_comp_chunk_size); break; case compression_type::DEFLATE: #if NVCOMP_HAS_DEFLATE status = nvcompBatchedDeflateCompressGetMaxOutputChunkSize( - max_uncompressed_chunk_bytes, nvcompBatchedDeflateDefaultOpts, &max_comp_chunk_size); + capped_uncomp_bytes, nvcompBatchedDeflateDefaultOpts, &max_comp_chunk_size); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + status = nvcompBatchedZstdCompressGetMaxOutputChunkSize( + capped_uncomp_bytes, nvcompBatchedZstdDefaultOpts, &max_comp_chunk_size); break; #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; default: CUDF_FAIL("Unsupported compression type"); } @@ -289,26 +318,50 @@ static void batched_compress_async(compression_type compression, #else CUDF_FAIL("Unsupported compression type"); #endif - case compression_type::ZSTD: [[fallthrough]]; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + nvcomp_status = nvcompBatchedZstdCompressAsync(device_uncompressed_ptrs, + device_uncompressed_bytes, + max_uncompressed_chunk_bytes, + batch_size, + device_temp_ptr, + temp_bytes, + device_compressed_ptrs, + device_compressed_bytes, + nvcompBatchedZstdDefaultOpts, + stream.value()); + break; +#else + CUDF_FAIL("Unsupported compression type"); +#endif default: CUDF_FAIL("Unsupported compression type"); } CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in compression"); } +bool is_aligned(void const* ptr, std::uintptr_t alignment) noexcept +{ + return (reinterpret_cast(ptr) % alignment) == 0; +} + void batched_compress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, - uint32_t max_uncomp_chunk_size, + device_span results, rmm::cuda_stream_view stream) { auto const num_chunks = inputs.size(); + auto nvcomp_args = create_batched_nvcomp_args(inputs, outputs, stream); + + auto const max_uncomp_chunk_size = skip_unsupported_inputs( + nvcomp_args.input_data_sizes, results, compress_max_allowed_chunk_size(compression), stream); + auto const temp_size = batched_compress_temp_size(compression, num_chunks, max_uncomp_chunk_size); rmm::device_buffer scratch(temp_size, stream); + CUDF_EXPECTS(is_aligned(scratch.data(), 8), "Compression failed, misaligned scratch buffer"); rmm::device_uvector actual_compressed_data_sizes(num_chunks, stream); - auto const nvcomp_args = create_batched_nvcomp_args(inputs, outputs, stream); batched_compress_async(compression, nvcomp_args.input_data_ptrs.data(), @@ -321,7 +374,55 @@ void batched_compress(compression_type compression, actual_compressed_data_sizes.data(), stream.value()); - convert_status(std::nullopt, actual_compressed_data_sizes, statuses, stream); + update_compression_results(actual_compressed_data_sizes, results, stream); +} + +bool is_compression_enabled(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: + return NVCOMP_HAS_DEFLATE and detail::nvcomp_integration::is_all_enabled(); + case compression_type::SNAPPY: return detail::nvcomp_integration::is_stable_enabled(); + case compression_type::ZSTD: + return NVCOMP_HAS_ZSTD_COMP and detail::nvcomp_integration::is_all_enabled(); + default: return false; + } + return false; +} + +size_t compress_input_alignment_bits(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 0; + case compression_type::SNAPPY: return 0; + case compression_type::ZSTD: return 2; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +size_t compress_output_alignment_bits(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 3; + case compression_type::SNAPPY: return 0; + case compression_type::ZSTD: return 0; + default: CUDF_FAIL("Unsupported compression type"); + } +} + +std::optional compress_max_allowed_chunk_size(compression_type compression) +{ + switch (compression) { + case compression_type::DEFLATE: return 64 * 1024; + case compression_type::SNAPPY: return std::nullopt; + case compression_type::ZSTD: +#if NVCOMP_HAS_ZSTD_COMP + return nvcompZstdCompressionMaxAllowedChunkSize; +#else + CUDF_FAIL("Unsupported compression type"); +#endif + default: return std::nullopt; + } } } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu index 30551dc31cf..c3c1bff9073 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cu +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -57,31 +57,69 @@ batched_args create_batched_nvcomp_args(device_span c std::move(output_data_sizes)}; } -void convert_status(std::optional> nvcomp_stats, - device_span actual_uncompressed_sizes, - device_span cudf_stats, - rmm::cuda_stream_view stream) +void update_compression_results(device_span nvcomp_stats, + device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream) { - if (nvcomp_stats.has_value()) { - thrust::transform( + thrust::transform_if( + rmm::exec_policy(stream), + nvcomp_stats.begin(), + nvcomp_stats.end(), + actual_output_sizes.begin(), + results.begin(), + results.begin(), + [] __device__(auto const& nvcomp_status, auto const& size) { + return compression_result{size, + nvcomp_status == nvcompStatus_t::nvcompSuccess + ? compression_status::SUCCESS + : compression_status::FAILURE}; + }, + [] __device__(auto const& cudf_status) { + return cudf_status.status != compression_status::SKIPPED; + }); +} + +void update_compression_results(device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream) +{ + thrust::transform_if( + rmm::exec_policy(stream), + actual_output_sizes.begin(), + actual_output_sizes.end(), + results.begin(), + results.begin(), + [] __device__(auto const& size) { return compression_result{size}; }, + [] __device__(auto const& results) { return results.status != compression_status::SKIPPED; }); +} + +size_t skip_unsupported_inputs(device_span input_sizes, + device_span results, + std::optional max_valid_input_size, + rmm::cuda_stream_view stream) +{ + if (max_valid_input_size.has_value()) { + auto status_size_it = thrust::make_zip_iterator(input_sizes.begin(), results.begin()); + thrust::transform_if( rmm::exec_policy(stream), - nvcomp_stats->begin(), - nvcomp_stats->end(), - actual_uncompressed_sizes.begin(), - cudf_stats.begin(), - [] __device__(auto const& status, auto const& size) { - return decompress_status{size, status == nvcompStatus_t::nvcompSuccess ? 0u : 1u}; + results.begin(), + results.end(), + input_sizes.begin(), + status_size_it, + [] __device__(auto const& status) { + return thrust::pair{0, compression_result{0, compression_status::SKIPPED}}; + }, + [max_size = max_valid_input_size.value()] __device__(size_t input_size) { + return input_size > max_size; }); - } else { - thrust::transform(rmm::exec_policy(stream), - actual_uncompressed_sizes.begin(), - actual_uncompressed_sizes.end(), - cudf_stats.begin(), - [] __device__(size_t size) { - decompress_status status{}; - status.bytes_written = size; - return status; - }); } + + return thrust::reduce(rmm::exec_policy(stream), + input_sizes.begin(), + input_sizes.end(), + 0ul, + thrust::maximum()); } + } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cuh b/cpp/src/io/comp/nvcomp_adapter.cuh index 1cc65d41a51..e49a9a6d348 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cuh +++ b/cpp/src/io/comp/nvcomp_adapter.cuh @@ -48,10 +48,28 @@ batched_args create_batched_nvcomp_args(device_span c rmm::cuda_stream_view stream); /** - * @brief Convert nvcomp statuses into cuIO compression statuses. + * @brief Convert nvcomp statuses and output sizes into cuIO compression results. */ -void convert_status(std::optional> nvcomp_stats, - device_span actual_uncompressed_sizes, - device_span cudf_stats, - rmm::cuda_stream_view stream); +void update_compression_results(device_span nvcomp_stats, + device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Fill the result array based on the actual output sizes. + */ +void update_compression_results(device_span actual_output_sizes, + device_span results, + rmm::cuda_stream_view stream); + +/** + * @brief Mark unsupported input chunks for skipping. + * + * Returns the size of the largest remaining input chunk. + */ +size_t skip_unsupported_inputs(device_span input_sizes, + device_span results, + std::optional max_valid_input_size, + rmm::cuda_stream_view stream); + } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp index 40a85a3ac37..41af564ca76 100644 --- a/cpp/src/io/comp/nvcomp_adapter.hpp +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -18,6 +18,7 @@ #include "gpuinflate.hpp" +#include #include #include @@ -26,13 +27,23 @@ namespace cudf::io::nvcomp { enum class compression_type { SNAPPY, ZSTD, DEFLATE }; +/** + * @brief Whether the given compression type is enabled through nvCOMP. + * + * Result depends on nvCOMP version and environment variables. + * + * @param compression Compression type + * @returns true if nvCOMP use is enabled; false otherwise + */ +[[nodiscard]] bool is_compression_enabled(compression_type compression); + /** * @brief Device batch decompression of given type. * * @param[in] compression Compression type * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures + * @param[out] results List of output status structures * @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk * @param[in] max_total_uncomp_size maximum total size of uncompressed data * @param[in] stream CUDA stream to use @@ -40,7 +51,7 @@ enum class compression_type { SNAPPY, ZSTD, DEFLATE }; void batched_decompress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, size_t max_uncomp_chunk_size, size_t max_total_uncomp_size, rmm::cuda_stream_view stream); @@ -51,8 +62,32 @@ void batched_decompress(compression_type compression, * @param compression Compression type * @param max_uncomp_chunk_size Size of the largest uncompressed chunk in the batch */ -size_t batched_compress_get_max_output_chunk_size(compression_type compression, - uint32_t max_uncomp_chunk_size); +[[nodiscard]] size_t compress_max_output_chunk_size(compression_type compression, + uint32_t max_uncomp_chunk_size); + +/** + * @brief Gets input alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment, in bits + */ +[[nodiscard]] size_t compress_input_alignment_bits(compression_type compression); + +/** + * @brief Gets output alignment requirements for the given compression type. + * + * @param compression Compression type + * @returns required alignment, in bits + */ +[[nodiscard]] size_t compress_output_alignment_bits(compression_type compression); + +/** + * @brief Maximum size of uncompressed chunks that can be compressed with nvCOMP. + * + * @param compression Compression type + * @returns maximum chunk size + */ +[[nodiscard]] std::optional compress_max_allowed_chunk_size(compression_type compression); /** * @brief Device batch compression of given type. @@ -60,15 +95,13 @@ size_t batched_compress_get_max_output_chunk_size(compression_type compression, * @param[in] compression Compression type * @param[in] inputs List of input buffers * @param[out] outputs List of output buffers - * @param[out] statuses List of output status structures - * @param[in] max_uncomp_chunk_size Size of the largest uncompressed chunk in the batch + * @param[out] results List of output status structures * @param[in] stream CUDA stream to use */ void batched_compress(compression_type compression, device_span const> inputs, device_span const> outputs, - device_span statuses, - uint32_t max_uncomp_chunk_size, + device_span results, rmm::cuda_stream_view stream); } // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 820a7f937d7..6c7ab490751 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -260,7 +260,7 @@ static __device__ uint32_t Match60(const uint8_t* src1, __global__ void __launch_bounds__(128) snap_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses) + device_span results) { __shared__ __align__(16) snap_state_s state_g; @@ -337,21 +337,22 @@ __global__ void __launch_bounds__(128) } __syncthreads(); if (!t) { - statuses[blockIdx.x].bytes_written = s->dst - s->dst_base; - statuses[blockIdx.x].status = (s->dst > s->end) ? 1 : 0; - statuses[blockIdx.x].reserved = 0; + results[blockIdx.x].bytes_written = s->dst - s->dst_base; + results[blockIdx.x].status = + (s->dst > s->end) ? compression_status::FAILURE : compression_status::SUCCESS; + results[blockIdx.x].reserved = 0; } } void gpu_snap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block dim3 dim_grid(inputs.size(), 1); if (inputs.size() > 0) { - snap_kernel<<>>(inputs, outputs, statuses); + snap_kernel<<>>(inputs, outputs, results); } } diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 6f33c9f1de9..8e58f86317c 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -520,7 +520,9 @@ size_t decompress_zstd(host_span src, hd_dsts[0] = d_dst; hd_dsts.host_to_device(stream); - auto hd_stats = hostdevice_vector(1, stream); + auto hd_stats = hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device(stream); auto const max_uncomp_page_size = dst.size(); nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, hd_srcs, @@ -531,7 +533,7 @@ size_t decompress_zstd(host_span src, stream); hd_stats.device_to_host(stream, true); - CUDF_EXPECTS(hd_stats[0].status == 0, "ZSTD decompression failed"); + CUDF_EXPECTS(hd_stats[0].status == compression_status::SUCCESS, "ZSTD decompression failed"); // Copy temporary output to `dst` CUDF_CUDA_TRY(cudaMemcpyAsync( diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 98011a57ea8..8b13ddd1de4 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -627,7 +627,7 @@ template __global__ void __launch_bounds__(block_size) unsnap_kernel(device_span const> inputs, device_span const> outputs, - device_span statuses) + device_span results) { __shared__ __align__(16) unsnap_state_s state_g; __shared__ cub::WarpReduce::TempStorage temp_storage; @@ -698,25 +698,26 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); } if (!t) { - statuses[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; - statuses[strm_id].status = s->error; + results[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; + results[strm_id].status = + (s->error == 0) ? compression_status::SUCCESS : compression_status::FAILURE; if (log_cyclecount) { - statuses[strm_id].reserved = clock() - s->tstart; + results[strm_id].reserved = clock() - s->tstart; } else { - statuses[strm_id].reserved = 0; + results[strm_id].reserved = 0; } } } void gpu_unsnap(device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block dim3 dim_grid(inputs.size(), 1); // TODO: Check max grid dimensions vs max expected count - unsnap_kernel<128><<>>(inputs, outputs, statuses); + unsnap_kernel<128><<>>(inputs, outputs, results); } } // namespace io diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 858f7682b11..a007750d264 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -38,12 +38,12 @@ namespace cudf { namespace io { namespace orc { struct PostScript { - uint64_t footerLength = 0; // the length of the footer section in bytes - CompressionKind compression = NONE; // the kind of generic compression used - uint32_t compressionBlockSize = 256 * 1024; // the maximum size of each compression chunk - std::vector version; // the version of the writer [major, minor] - uint64_t metadataLength = 0; // the length of the metadata section in bytes - std::string magic = ""; // the fixed string "ORC" + uint64_t footerLength = 0; // the length of the footer section in bytes + CompressionKind compression = NONE; // the kind of generic compression used + uint32_t compressionBlockSize{}; // the maximum size of each compression chunk + std::vector version; // the version of the writer [major, minor] + uint64_t metadataLength = 0; // the length of the metadata section in bytes + std::string magic = ""; // the fixed string "ORC" }; struct StripeInformation { diff --git a/cpp/src/io/orc/orc_common.hpp b/cpp/src/io/orc/orc_common.hpp index 29a4ad6ed78..c2898b362a6 100644 --- a/cpp/src/io/orc/orc_common.hpp +++ b/cpp/src/io/orc/orc_common.hpp @@ -24,13 +24,6 @@ namespace orc { static constexpr uint32_t block_header_size = 3; -constexpr uint32_t compressed_block_size(uint32_t compressed_data_size) -{ - return ((compressed_data_size + block_header_size + 0xFF) & ~0xFF); -} - -static constexpr uint32_t padded_block_header_size = compressed_block_size(0); - enum CompressionKind : uint8_t { NONE = 0, ZLIB = 1, diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 9de7dfffc0c..c7a7a423cf2 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -56,12 +56,12 @@ struct CompressedStreamInfo { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data uint8_t* uncompressed_data; // [in] base ptr to uncompressed stream data or NULL if not known yet - size_t compressed_data_size; // [in] compressed data size for this stream - device_span* dec_in_ctl; // [in] input buffer to decompress - device_span* dec_out_ctl; // [in] output buffer to decompress into - device_span decstatus; // [in] results of decompression - device_span* copy_in_ctl; // [out] input buffer to copy - device_span* copy_out_ctl; // [out] output buffer to copy to + size_t compressed_data_size; // [in] compressed data size for this stream + device_span* dec_in_ctl; // [in] input buffer to decompress + device_span* dec_out_ctl; // [in] output buffer to decompress into + device_span dec_res; // [in] results of decompression + device_span* copy_in_ctl; // [out] input buffer to copy + device_span* copy_out_ctl; // [out] output buffer to copy to uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) uint32_t num_uncompressed_blocks; // [in,out] number of entries in dec_in_ctl(in), number of @@ -348,11 +348,10 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] compression Type of compression * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression + * @param[in] comp_block_align Required alignment for compressed blocks * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] - * @param[out] comp_in Per-block compression input buffers - * @param[out] comp_out Per-block compression output buffers - * @param[out] comp_stat Per-block compression status + * @param[out] comp_res Per-block compression status * @param[in] stream CUDA stream used for device memory operations and kernel launches */ void CompressOrcDataStreams(uint8_t* compressed_data, @@ -360,11 +359,10 @@ void CompressOrcDataStreams(uint8_t* compressed_data, CompressionKind compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, + uint32_t comp_block_align, device_2dspan strm_desc, device_2dspan enc_streams, - device_span> comp_in, - device_span> comp_out, - device_span comp_stat, + device_span comp_res, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index c79aa5d7a4f..7ff3ee85939 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -262,26 +262,26 @@ auto decimal_column_type(std::vector const& decimal128_columns, } // namespace -__global__ void decompress_check_kernel(device_span stats, +__global__ void decompress_check_kernel(device_span results, bool* any_block_failure) { auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - if (stats[tid].status != 0) { + if (tid < results.size()) { + if (results[tid].status != compression_status::SUCCESS) { *any_block_failure = true; // Doesn't need to be atomic } } } -void decompress_check(device_span stats, +void decompress_check(device_span results, bool* any_block_failure, rmm::cuda_stream_view stream) { - if (stats.empty()) { return; } // early exit for empty stats + if (results.empty()) { return; } // early exit for empty results dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); - decompress_check_kernel<<>>(stats, any_block_failure); + dim3 grid(cudf::util::div_rounding_up_safe(results.size(), static_cast(block.x))); + decompress_check_kernel<<>>(results, any_block_failure); } rmm::device_buffer reader::impl::decompress_stripe_data( @@ -337,7 +337,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data( num_compressed_blocks + num_uncompressed_blocks, stream); rmm::device_uvector> inflate_out( num_compressed_blocks + num_uncompressed_blocks, stream); - rmm::device_uvector inflate_stats(num_compressed_blocks, stream); + rmm::device_uvector inflate_res(num_compressed_blocks, stream); + thrust::fill(rmm::exec_policy(stream), + inflate_res.begin(), + inflate_res.end(), + compression_result{0, compression_status::FAILURE}); // Parse again to populate the decompression input/output buffers size_t decomp_offset = 0; @@ -349,8 +353,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( compinfo[i].uncompressed_data = dst_base + decomp_offset; compinfo[i].dec_in_ctl = inflate_in.data() + start_pos; compinfo[i].dec_out_ctl = inflate_out.data() + start_pos; - compinfo[i].decstatus = {inflate_stats.data() + start_pos, compinfo[i].num_compressed_blocks}; - compinfo[i].copy_in_ctl = inflate_in.data() + start_pos_uncomp; + compinfo[i].dec_res = {inflate_res.data() + start_pos, compinfo[i].num_compressed_blocks}; + compinfo[i].copy_in_ctl = inflate_in.data() + start_pos_uncomp; compinfo[i].copy_out_ctl = inflate_out.data() + start_pos_uncomp; stream_info[i].dst_pos = decomp_offset; @@ -379,13 +383,13 @@ rmm::device_buffer reader::impl::decompress_stripe_data( nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); } else { gpuinflate( - inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); + inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream); } break; case compression_type::SNAPPY: @@ -393,26 +397,26 @@ rmm::device_buffer reader::impl::decompress_stripe_data( nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); } else { - gpu_unsnap(inflate_in_view, inflate_out_view, inflate_stats, stream); + gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream); } break; case compression_type::ZSTD: nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, inflate_in_view, inflate_out_view, - inflate_stats, + inflate_res, max_uncomp_block_size, total_decomp_size, stream); break; default: CUDF_FAIL("Unexpected decompression dispatch"); break; } - decompress_check(inflate_stats, any_block_failure.device_ptr(), stream); + decompress_check(inflate_res, any_block_failure.device_ptr(), stream); } if (num_uncompressed_blocks > 0) { device_span> copy_in_view{inflate_in.data() + num_compressed_blocks, diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 5e9a6f8df6b..b1c04099e64 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -17,14 +17,16 @@ #include "orc_common.hpp" #include "orc_gpu.hpp" -#include -#include -#include #include #include #include #include +#include +#include +#include +#include + #include #include #include @@ -1142,10 +1144,11 @@ __global__ void __launch_bounds__(1024) * @param[in] chunks EncChunk device array [rowgroup][column] * @param[out] inputs Per-block compression input buffers * @param[out] outputs Per-block compression output buffers - * @param[out] statuses Per-block compression status + * @param[out] results Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression + * @param[in] comp_block_align Required alignment for compressed blocks */ // blockDim {256,1,1} __global__ void __launch_bounds__(256) @@ -1153,14 +1156,18 @@ __global__ void __launch_bounds__(256) device_2dspan streams, // const? device_span> inputs, device_span> outputs, - device_span statuses, + device_span results, uint8_t* compressed_bfr, uint32_t comp_blk_size, - uint32_t max_comp_blk_size) + uint32_t max_comp_blk_size, + uint32_t comp_block_align) { __shared__ __align__(16) StripeStream ss; __shared__ uint8_t* volatile uncomp_base_g; + auto const padded_block_header_size = util::round_up_unsafe(block_header_size, comp_block_align); + auto const padded_comp_block_size = util::round_up_unsafe(max_comp_blk_size, comp_block_align); + auto const stripe_id = blockIdx.x; auto const stream_id = blockIdx.y; uint32_t t = threadIdx.x; @@ -1177,10 +1184,10 @@ __global__ void __launch_bounds__(256) num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 1; for (uint32_t b = t; b < num_blocks; b += 256) { uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size}; - auto const dst_offset = b * compressed_block_size(max_comp_blk_size) + padded_block_header_size; - outputs[ss.first_block + b] = {dst + dst_offset, max_comp_blk_size}; - statuses[ss.first_block + b] = {blk_size, 1, 0}; + inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size}; + auto const dst_offset = b * (padded_block_header_size + padded_comp_block_size); + outputs[ss.first_block + b] = {dst + dst_offset, max_comp_blk_size}; + results[ss.first_block + b] = {0, compression_status::FAILURE}; } } @@ -1190,9 +1197,9 @@ __global__ void __launch_bounds__(256) * * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[out] inputs Per-block compression input buffers + * @param[in] inputs Per-block compression input buffers * @param[out] outputs Per-block compression output buffers - * @param[out] statuses Per-block compression status + * @param[out] results Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression @@ -1202,7 +1209,7 @@ __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(device_2dspan strm_desc, device_span const> inputs, device_span const> outputs, - device_span statuses, + device_span results, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1228,16 +1235,16 @@ __global__ void __launch_bounds__(1024) if (t == 0) { auto const src_len = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - auto dst_len = (statuses[ss.first_block + b].status == 0) - ? statuses[ss.first_block + b].bytes_written + auto dst_len = (results[ss.first_block + b].status == compression_status::SUCCESS) + ? results[ss.first_block + b].bytes_written : src_len; uint32_t blk_size24{}; - if (statuses[ss.first_block + b].status == 0) { + if (results[ss.first_block + b].status == compression_status::SUCCESS) { // Copy from uncompressed source - src = inputs[ss.first_block + b].data(); - statuses[ss.first_block + b].bytes_written = src_len; - dst_len = src_len; - blk_size24 = dst_len * 2 + 1; + src = inputs[ss.first_block + b].data(); + results[ss.first_block + b].bytes_written = src_len; + dst_len = src_len; + blk_size24 = dst_len * 2 + 1; } else { // Compressed block src = outputs[ss.first_block + b].data(); @@ -1307,51 +1314,59 @@ void CompressOrcDataStreams(uint8_t* compressed_data, CompressionKind compression, uint32_t comp_blk_size, uint32_t max_comp_blk_size, + uint32_t comp_block_align, device_2dspan strm_desc, device_2dspan enc_streams, - device_span> comp_in, - device_span> comp_out, - device_span comp_stat, + device_span comp_res, rmm::cuda_stream_view stream) { + rmm::device_uvector> comp_in(num_compressed_blocks, stream); + rmm::device_uvector> comp_out(num_compressed_blocks, stream); + dim3 dim_block_init(256, 1); dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); gpuInitCompressionBlocks<<>>(strm_desc, enc_streams, comp_in, comp_out, - comp_stat, + comp_res, compressed_data, comp_blk_size, - max_comp_blk_size); + max_comp_blk_size, + comp_block_align); if (compression == SNAPPY) { try { - if (detail::nvcomp_integration::is_stable_enabled()) { + if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) { nvcomp::batched_compress( - nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_stat, comp_blk_size, stream); + nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); } else { - gpu_snap(comp_in, comp_out, comp_stat, stream); + gpu_snap(comp_in, comp_out, comp_res, stream); } } catch (...) { // There was an error in compressing so set an error status for each block - thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(decompress_status & stat) { stat.status = 1; }); + thrust::for_each( + rmm::exec_policy(stream), + comp_res.begin(), + comp_res.end(), + [] __device__(compression_result & stat) { stat.status = compression_status::FAILURE; }); // Since SNAPPY is the default compression (may not be explicitly requested), fall back to // writing without compression } - } else if (compression == ZLIB and detail::nvcomp_integration::is_all_enabled()) { + } else if (compression == ZLIB and + nvcomp::is_compression_enabled(nvcomp::compression_type::DEFLATE)) { nvcomp::batched_compress( - nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_stat, comp_blk_size, stream); + nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream); + } else if (compression == ZSTD and + nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) { + nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); } else if (compression != NONE) { CUDF_FAIL("Unsupported compression type"); } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( - strm_desc, comp_in, comp_out, comp_stat, compressed_data, comp_blk_size, max_comp_blk_size); + strm_desc, comp_in, comp_out, comp_res, compressed_data, comp_blk_size, max_comp_blk_size); } } // namespace gpu diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index edae60bfa6d..bd65089810e 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -160,7 +160,7 @@ __global__ void __launch_bounds__(128, 8) const uint8_t* cur = s->info.compressed_data; const uint8_t* end = cur + s->info.compressed_data_size; auto dec_out = s->info.dec_out_ctl; - auto dec_status = s->info.decstatus; + auto dec_result = s->info.dec_res; uint8_t* uncompressed_actual = s->info.uncompressed_data; uint8_t* uncompressed_estimated = uncompressed_actual; uint32_t num_compressed_blocks = 0; @@ -178,13 +178,9 @@ __global__ void __launch_bounds__(128, 8) uncompressed_size_actual = block_len; } else { if (num_compressed_blocks > max_compressed_blocks) { break; } - if (shuffle((lane_id == 0) ? dec_status[num_compressed_blocks].status : 0) != 0) { - // Decompression failed, not much point in doing anything else - break; - } uint32_t const dst_size = dec_out[num_compressed_blocks].size(); uncompressed_size_est = shuffle((lane_id == 0) ? dst_size : 0); - uint32_t const bytes_written = dec_status[num_compressed_blocks].bytes_written; + uint32_t const bytes_written = dec_result[num_compressed_blocks].bytes_written; uncompressed_size_actual = shuffle((lane_id == 0) ? bytes_written : 0); } // In practice, this should never happen with a well-behaved writer, as we would expect the @@ -383,7 +379,7 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, const uint8_t* start = s->strm_info[ci_id].compressed_data; const uint8_t* cur = start; const uint8_t* end = cur + s->strm_info[ci_id].compressed_data_size; - auto decstatus = s->strm_info[ci_id].decstatus.data(); + auto dec_result = s->strm_info[ci_id].dec_res.data(); uint32_t uncomp_offset = 0; for (;;) { uint32_t block_len; @@ -400,8 +396,8 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, if (is_uncompressed) { uncomp_offset += block_len; } else { - uncomp_offset += decstatus->bytes_written; - decstatus++; + uncomp_offset += dec_result->bytes_written; + dec_result++; } } s->rowgroups[t].strm_offset[ci_id] += uncomp_offset; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 90858ac6fcc..a5e9e9da4cb 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -85,7 +85,18 @@ template using pinned_buffer = std::unique_ptr; /** - * @brief Function that translates GDF compression to ORC compression + * @brief Translates ORC compression to nvCOMP compression + */ +auto to_nvcomp_compression_type(CompressionKind compression_kind) +{ + if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; + if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; + if (compression_kind == ZSTD) return nvcomp::compression_type::ZSTD; + CUDF_FAIL("Unsupported compression type"); +} + +/** + * @brief Translates cuDF compression to ORC compression */ orc::CompressionKind to_orc_compression(compression_type compression) { @@ -93,27 +104,30 @@ orc::CompressionKind to_orc_compression(compression_type compression) case compression_type::AUTO: case compression_type::SNAPPY: return orc::CompressionKind::SNAPPY; case compression_type::ZLIB: return orc::CompressionKind::ZLIB; + case compression_type::ZSTD: return orc::CompressionKind::ZSTD; case compression_type::NONE: return orc::CompressionKind::NONE; - default: CUDF_FAIL("Unsupported compression type"); return orc::CompressionKind::NONE; + default: CUDF_FAIL("Unsupported compression type"); } } /** * @brief Returns the block size for a given compression kind. - * - * The nvCOMP ZLIB compression is limited to blocks up to 64KiB. */ constexpr size_t compression_block_size(orc::CompressionKind compression) { - switch (compression) { - case orc::CompressionKind::NONE: return 0; - case orc::CompressionKind::ZLIB: return 64 * 1024; - default: return 256 * 1024; - } + if (compression == orc::CompressionKind::NONE) { return 0; } + + auto const ncomp_type = to_nvcomp_compression_type(compression); + auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type) + ? nvcomp::compress_max_allowed_chunk_size(ncomp_type) + : std::nullopt; + + constexpr size_t max_block_size = 256 * 1024; + return std::min(nvcomp_limit.value_or(max_block_size), max_block_size); } /** - * @brief Function that translates GDF dtype to ORC datatype + * @brief Translates cuDF dtype to ORC datatype */ constexpr orc::TypeKind to_orc_type(cudf::type_id id, bool list_column_as_map) { @@ -520,6 +534,26 @@ constexpr size_t RLE_stream_size(TypeKind kind, size_t count) } } +auto uncomp_block_alignment(CompressionKind compression_kind) +{ + if (compression_kind == NONE or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) { + return 1u; + } + + return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(compression_kind)); +} + +auto comp_block_alignment(CompressionKind compression_kind) +{ + if (compression_kind == NONE or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) { + return 1u; + } + + return 1u << nvcomp::compress_output_alignment_bits(to_nvcomp_compression_type(compression_kind)); +} + orc_streams writer::impl::create_streams(host_span columns, file_segmentation const& segmentation, std::map const& decimal_column_sizes) @@ -565,9 +599,13 @@ orc_streams writer::impl::create_streams(host_span columns, auto add_stream = [&](gpu::StreamIndexType index_type, StreamKind kind, TypeKind type_kind, size_t size) { - const auto base = column.index() * gpu::CI_NUM_STREAMS; - ids[base + index_type] = streams.size(); - streams.push_back(orc::Stream{kind, column.id(), size}); + auto const max_alignment_padding = uncomp_block_alignment(compression_kind_) - 1; + const auto base = column.index() * gpu::CI_NUM_STREAMS; + ids[base + index_type] = streams.size(); + streams.push_back(orc::Stream{ + kind, + column.id(), + (size == 0) ? 0 : size + max_alignment_padding * segmentation.num_rowgroups()}); types.push_back(type_kind); }; @@ -868,6 +906,7 @@ encoded_data encode_columns(orc_table_view const& orc_table, encoder_decimal_info&& dec_chunk_sizes, file_segmentation const& segmentation, orc_streams const& streams, + uint32_t uncomp_block_align, rmm::cuda_stream_view stream) { auto const num_columns = orc_table.num_columns(); @@ -1020,10 +1059,16 @@ encoded_data encode_columns(orc_table_view const& orc_table, strm.lengths[strm_type] = 0; strm.data_ptrs[strm_type] = nullptr; } + auto const misalignment = + reinterpret_cast(strm.data_ptrs[strm_type]) % uncomp_block_align; + if (misalignment != 0) { + strm.data_ptrs[strm_type] += (uncomp_block_align - misalignment); + } } } } } + chunk_streams.host_to_device(stream); if (orc_table.num_rows() > 0) { @@ -1340,7 +1385,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_res, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, @@ -1365,17 +1410,17 @@ void writer::impl::write_index_stream(int32_t stripe_id, } return record; }; - auto scan_record = [=, &comp_out](gpu::encoder_chunk_streams const& stream, + auto scan_record = [=, &comp_res](gpu::encoder_chunk_streams const& stream, gpu::StreamIndexType type, row_group_index_info& record) { if (record.pos >= 0) { record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && (static_cast(record.pos) >= compression_blocksize_) && - (record.comp_pos + block_header_size + comp_out[record.blk_pos].bytes_written < + (record.comp_pos + block_header_size + comp_res[record.blk_pos].bytes_written < static_cast(record.comp_size))) { record.pos -= compression_blocksize_; - record.comp_pos += block_header_size + comp_out[record.blk_pos].bytes_written; + record.comp_pos += block_header_size + comp_res[record.blk_pos].bytes_written; record.blk_pos += 1; } } @@ -2007,20 +2052,12 @@ __global__ void copy_string_data(char* string_pool, } } -auto to_nvcomp_compression_type(CompressionKind compression_kind) -{ - if (compression_kind == SNAPPY) return nvcomp::compression_type::SNAPPY; - if (compression_kind == ZLIB) return nvcomp::compression_type::DEFLATE; - CUDF_FAIL("Unsupported compression type"); -} - -size_t get_compress_max_output_chunk_size(CompressionKind compression_kind, - uint32_t compression_blocksize) +size_t max_compression_output_size(CompressionKind compression_kind, uint32_t compression_blocksize) { if (compression_kind == NONE) return 0; - return batched_compress_get_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), - compression_blocksize); + return compress_max_output_chunk_size(to_nvcomp_compression_type(compression_kind), + compression_blocksize); } void writer::impl::persisted_statistics::persist(int num_table_rows, @@ -2124,10 +2161,16 @@ void writer::impl::write(table_view const& table) auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream); + auto const uncomp_block_align = uncomp_block_alignment(compression_kind_); auto streams = create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes)); - auto enc_data = encode_columns( - orc_table, std::move(dictionaries), std::move(dec_chunk_sizes), segmentation, streams, stream); + auto enc_data = encode_columns(orc_table, + std::move(dictionaries), + std::move(dec_chunk_sizes), + segmentation, + streams, + uncomp_block_align, + stream); // Assemble individual disparate column chunks into contiguous data streams size_type const num_index_streams = (orc_table.num_columns() + 1); @@ -2140,8 +2183,13 @@ void writer::impl::write(table_view const& table) // Allocate intermediate output stream buffer size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; + auto const max_compressed_block_size = - get_compress_max_output_chunk_size(compression_kind_, compression_blocksize_); + max_compression_output_size(compression_kind_, compression_blocksize_); + auto const padded_max_compressed_block_size = + util::round_up_unsafe(max_compressed_block_size, uncomp_block_align); + auto const padded_block_header_size = + util::round_up_unsafe(block_header_size, uncomp_block_align); auto stream_output = [&]() { size_t max_stream_size = 0; @@ -2158,7 +2206,8 @@ void writer::impl::write(table_view const& table) (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); stream_size += num_blocks * block_header_size; num_compressed_blocks += num_blocks; - compressed_bfr_size += compressed_block_size(max_compressed_block_size) * num_blocks; + compressed_bfr_size += + (padded_block_header_size + padded_max_compressed_block_size) * num_blocks; } max_stream_size = std::max(max_stream_size, stream_size); } @@ -2177,9 +2226,11 @@ void writer::impl::write(table_view const& table) // Compress the data streams rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector> comp_in(num_compressed_blocks, stream); - hostdevice_vector> comp_out(num_compressed_blocks, stream); - hostdevice_vector comp_stats(num_compressed_blocks, stream); + hostdevice_vector comp_results(num_compressed_blocks, stream); + thrust::fill(rmm::exec_policy(stream), + comp_results.d_begin(), + comp_results.d_end(), + compression_result{0, compression_status::FAILURE}); if (compression_kind_ != NONE) { strm_descs.host_to_device(stream); gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), @@ -2187,14 +2238,13 @@ void writer::impl::write(table_view const& table) compression_kind_, compression_blocksize_, max_compressed_block_size, + comp_block_alignment(compression_kind_), strm_descs, enc_data.streams, - comp_in, - comp_out, - comp_stats, + comp_results, stream); strm_descs.device_to_host(stream); - comp_stats.device_to_host(stream, true); + comp_results.device_to_host(stream, true); } ProtobufWriter pbw_(&buffer_); @@ -2221,7 +2271,7 @@ void writer::impl::write(table_view const& table) segmentation, enc_data.streams, strm_descs, - comp_stats, + comp_results, intermediate_stats.rowgroup_blobs, &stripe, &streams, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index ed360a77632..dc8aad33af0 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -390,7 +390,7 @@ class writer::impl { file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index f06488671c3..77984ee3c27 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -228,7 +228,8 @@ __global__ void __launch_bounds__(128) statistics_merge_group* chunk_grstats, int32_t num_columns, size_t max_page_size_bytes, - size_type max_page_size_rows) + size_type max_page_size_rows, + uint32_t page_align) { // TODO: All writing seems to be done by thread 0. Could be replaced by thrust foreach __shared__ __align__(8) parquet_column_device_view col_g; @@ -284,7 +285,8 @@ __global__ void __launch_bounds__(128) page_g.num_rows = ck_g.num_dict_entries; page_g.num_leaf_values = ck_g.num_dict_entries; page_g.num_values = ck_g.num_dict_entries; // TODO: shouldn't matter for dict page - page_offset += page_g.max_hdr_size + page_g.max_data_size; + page_offset += + util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); if (not comp_page_sizes.empty()) { comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page]; } @@ -360,7 +362,8 @@ __global__ void __launch_bounds__(128) } page_g.max_hdr_size += stats_hdr_len; } - page_g.page_data = ck_g.uncompressed_bfr + page_offset; + page_g.max_hdr_size = util::round_up_unsafe(page_g.max_hdr_size, page_align); + page_g.page_data = ck_g.uncompressed_bfr + page_offset; if (not comp_page_sizes.empty()) { page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset; } @@ -384,7 +387,8 @@ __global__ void __launch_bounds__(128) pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; - page_offset += page_g.max_hdr_size + page_g.max_data_size; + page_offset += + util::round_up_unsafe(page_g.max_hdr_size + page_g.max_data_size, page_align); if (not comp_page_sizes.empty()) { comp_page_offset += page_g.max_hdr_size + comp_page_sizes[ck_g.first_page + num_pages]; } @@ -422,7 +426,7 @@ __global__ void __launch_bounds__(128) __syncwarp(); if (!t) { if (ck_g.ck_stat_size == 0 && ck_g.stats) { - uint32_t ck_stat_size = 48 + 2 * ck_max_stats_len; + uint32_t ck_stat_size = util::round_up_unsafe(48 + 2 * ck_max_stats_len, page_align); page_offset += ck_stat_size; comp_page_offset += ck_stat_size; ck_g.ck_stat_size = ck_stat_size; @@ -866,7 +870,7 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats) + device_span comp_results) { __shared__ __align__(8) page_enc_state_s state_g; using block_scan = cub::BlockScan; @@ -1213,18 +1217,17 @@ __global__ void __launch_bounds__(128, 8) } } if (t == 0) { - uint8_t* base = s->page.page_data + s->page.max_hdr_size; - auto actual_data_size = static_cast(s->cur - base); - uint32_t compressed_bfr_size = GetMaxCompressedBfrSize(actual_data_size); - s->page.max_data_size = actual_data_size; + uint8_t* base = s->page.page_data + s->page.max_hdr_size; + auto actual_data_size = static_cast(s->cur - base); + s->page.max_data_size = actual_data_size; if (not comp_in.empty()) { comp_in[blockIdx.x] = {base, actual_data_size}; - comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, compressed_bfr_size}; + comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, 0}; // size is unused } pages[blockIdx.x] = s->page; - if (not comp_stats.empty()) { - comp_stats[blockIdx.x] = {0, ~0u}; - pages[blockIdx.x].comp_stat = &comp_stats[blockIdx.x]; + if (not comp_results.empty()) { + comp_results[blockIdx.x] = {0, compression_status::FAILURE}; + pages[blockIdx.x].comp_res = &comp_results[blockIdx.x]; } } } @@ -1257,10 +1260,10 @@ __global__ void __launch_bounds__(128) gpuDecideCompression(device_spanbytes_written; - if (comp_status->status != 0) { atomicAdd(&error_count, 1); } + compressed_data_size += comp_res->bytes_written; + if (comp_res->status != compression_status::SUCCESS) { atomicAdd(&error_count, 1); } } } uncompressed_data_size = warp_reduce(temp_storage[0]).Sum(uncompressed_data_size); @@ -1677,7 +1680,7 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start, // blockDim(128, 1, 1) __global__ void __launch_bounds__(128) gpuEncodePageHeaders(device_span pages, - device_span comp_stat, + device_span comp_results, device_span page_stats, const statistics_chunk* chunk_stats) { @@ -1706,7 +1709,7 @@ __global__ void __launch_bounds__(128) uncompressed_page_size = page_g.max_data_size; if (ck_g.is_compressed) { hdr_start = page_g.compressed_data; - compressed_page_size = (uint32_t)comp_stat[blockIdx.x].bytes_written; + compressed_page_size = (uint32_t)comp_results[blockIdx.x].bytes_written; page_g.max_data_size = compressed_page_size; } else { hdr_start = page_g.page_data; @@ -2041,6 +2044,7 @@ void InitEncoderPages(device_2dspan chunks, int32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + uint32_t page_align, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream) @@ -2056,19 +2060,21 @@ void InitEncoderPages(device_2dspan chunks, chunk_grstats, num_columns, max_page_size_bytes, - max_page_size_rows); + max_page_size_rows, + page_align); } void EncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats, + device_span comp_results, rmm::cuda_stream_view stream) { auto num_pages = pages.size(); // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<128><<>>(pages, comp_in, comp_out, comp_stats); + gpuEncodePages<128> + <<>>(pages, comp_in, comp_out, comp_results); } void DecideCompression(device_span chunks, rmm::cuda_stream_view stream) @@ -2077,7 +2083,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view } void EncodePageHeaders(device_span pages, - device_span comp_stats, + device_span comp_results, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream) @@ -2085,7 +2091,7 @@ void EncodePageHeaders(device_span pages, // TODO: single thread task. No need for 128 threads/block. Earlier it used to employ rest of the // threads to coop load structs gpuEncodePageHeaders<<>>( - pages, comp_stats, page_stats, chunk_stats); + pages, comp_results, page_stats, chunk_stats); } void GatherPages(device_span chunks, diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 610275ee26b..d0d367df962 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -322,15 +322,6 @@ inline size_type __device__ row_to_value_idx(size_type idx, return idx; } -/** - * @brief Return worst-case compressed size of compressed data given the uncompressed size - */ -inline size_t __device__ __host__ GetMaxCompressedBfrSize(size_t uncomp_size, - uint32_t num_pages = 1) -{ - return uncomp_size + (uncomp_size >> 7) + num_pages * 8; -} - struct EncPage; /** @@ -389,7 +380,7 @@ struct EncPage { uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in //!< non-leaf levels - decompress_status* comp_stat; //!< Ptr to compression status + compression_result* comp_res; //!< Ptr to compression result }; /** @@ -544,6 +535,7 @@ void get_dictionary_indices(cudf::detail::device_2dspan * @param[in] num_rowgroups Number of fragments per column * @param[in] num_columns Number of columns * @param[in] page_grstats Setup for page-level stats + * @param[in] page_align Required alignment for uncompressed pages * @param[in] chunk_grstats Setup for chunk-level stats * @param[in] max_page_comp_data_size Calculated maximum compressed data size of pages * @param[in] stream CUDA stream to use, default 0 @@ -556,6 +548,7 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, int32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + uint32_t page_align, statistics_merge_group* page_grstats, statistics_merge_group* chunk_grstats, rmm::cuda_stream_view stream); @@ -566,13 +559,13 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, * @param[in,out] pages Device array of EncPages (unordered) * @param[out] comp_in Compressor input buffers * @param[out] comp_in Compressor output buffers - * @param[out] comp_stats Compressor statuses + * @param[out] comp_stats Compressor results * @param[in] stream CUDA stream to use, default 0 */ void EncodePages(device_span pages, device_span> comp_in, device_span> comp_out, - device_span comp_stats, + device_span comp_res, rmm::cuda_stream_view stream); /** @@ -593,7 +586,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @param[in] stream CUDA stream to use, default 0 */ void EncodePageHeaders(device_span pages, - device_span comp_stats, + device_span comp_res, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 2553b375e72..59bef6f5600 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -247,13 +247,15 @@ std::tuple conversion_info(type_id column_type_id, return std::make_tuple(type_width, clock_rate, converted_type); } -inline void decompress_check(device_span stats, +inline void decompress_check(device_span results, rmm::cuda_stream_view stream) { CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream), - stats.begin(), - stats.end(), - [] __device__(auto const& stat) { return stat.status == 0; }), + results.begin(), + results.end(), + [] __device__(auto const& res) { + return res.status == compression_status::SUCCESS; + }), "Error during decompression"); } } // namespace @@ -1149,11 +1151,11 @@ rmm::device_buffer reader::impl::decompress_page_data( std::vector> comp_out; comp_out.reserve(num_comp_pages); - rmm::device_uvector comp_stats(num_comp_pages, _stream); + rmm::device_uvector comp_res(num_comp_pages, _stream); thrust::fill(rmm::exec_policy(_stream), - comp_stats.begin(), - comp_stats.end(), - decompress_status{0, static_cast(-1000), 0}); + comp_res.begin(), + comp_res.end(), + compression_result{0, compression_status::FAILURE}); size_t decomp_offset = 0; int32_t start_pos = 0; @@ -1177,31 +1179,30 @@ rmm::device_buffer reader::impl::decompress_page_data( host_span const> comp_out_view(comp_out.data() + start_pos, codec.num_pages); auto const d_comp_out = cudf::detail::make_device_uvector_async(comp_out_view, _stream); - device_span d_comp_stats_view(comp_stats.data() + start_pos, - codec.num_pages); + device_span d_comp_res_view(comp_res.data() + start_pos, codec.num_pages); switch (codec.compression_type) { case parquet::GZIP: - gpuinflate(d_comp_in, d_comp_out, d_comp_stats_view, gzip_header_included::YES, _stream); + gpuinflate(d_comp_in, d_comp_out, d_comp_res_view, gzip_header_included::YES, _stream); break; case parquet::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, _stream); } else { - gpu_unsnap(d_comp_in, d_comp_out, d_comp_stats_view, _stream); + gpu_unsnap(d_comp_in, d_comp_out, d_comp_res_view, _stream); } break; case parquet::ZSTD: nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, codec.max_decompressed_size, codec.total_decomp_size, _stream); @@ -1209,7 +1210,7 @@ rmm::device_buffer reader::impl::decompress_page_data( case parquet::BROTLI: gpu_debrotli(d_comp_in, d_comp_out, - d_comp_stats_view, + d_comp_res_view, debrotli_scratch.data(), debrotli_scratch.size(), _stream); @@ -1219,7 +1220,7 @@ rmm::device_buffer reader::impl::decompress_page_data( start_pos += codec.num_pages; } - decompress_check(comp_stats, _stream); + decompress_check(comp_res, _stream); // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 58910420173..2bfd7c1ba4d 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -24,6 +24,7 @@ #include "compact_protocol_reader.hpp" #include "compact_protocol_writer.hpp" +#include #include #include #include @@ -43,8 +44,6 @@ #include #include -#include - #include #include #include @@ -79,6 +78,7 @@ parquet::Compression to_parquet_compression(compression_type compression) switch (compression) { case compression_type::AUTO: case compression_type::SNAPPY: return parquet::Compression::SNAPPY; + case compression_type::ZSTD: return parquet::Compression::ZSTD; case compression_type::NONE: return parquet::Compression::UNCOMPRESSED; default: CUDF_FAIL("Unsupported compression type"); } @@ -907,11 +907,36 @@ void writer::impl::gather_fragment_statistics( stream.synchronize(); } +auto to_nvcomp_compression_type(Compression codec) +{ + if (codec == Compression::SNAPPY) return nvcomp::compression_type::SNAPPY; + if (codec == Compression::ZSTD) return nvcomp::compression_type::ZSTD; + CUDF_FAIL("Unsupported compression type"); +} + +auto page_alignment(Compression codec) +{ + if (codec == Compression::UNCOMPRESSED or + not nvcomp::is_compression_enabled(to_nvcomp_compression_type(codec))) { + return 1u; + } + + return 1u << nvcomp::compress_input_alignment_bits(to_nvcomp_compression_type(codec)); +} + +size_t max_compression_output_size(Compression codec, uint32_t compression_blocksize) +{ + if (codec == Compression::UNCOMPRESSED) return 0; + + return compress_max_output_chunk_size(to_nvcomp_compression_type(codec), compression_blocksize); +} + auto init_page_sizes(hostdevice_2dvector& chunks, device_span col_desc, uint32_t num_columns, size_t max_page_size_bytes, size_type max_page_size_rows, + Compression compression_codec, rmm::cuda_stream_view stream) { if (chunks.is_empty()) { return hostdevice_vector{}; } @@ -926,6 +951,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -949,6 +975,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -956,12 +983,12 @@ auto init_page_sizes(hostdevice_2dvector& chunks, // Get per-page max compressed size hostdevice_vector comp_page_sizes(num_pages, stream); - std::transform(page_sizes.begin(), page_sizes.end(), comp_page_sizes.begin(), [](auto page_size) { - size_t page_comp_max_size = 0; - nvcompBatchedSnappyCompressGetMaxOutputChunkSize( - page_size, nvcompBatchedSnappyDefaultOpts, &page_comp_max_size); - return page_comp_max_size; - }); + std::transform(page_sizes.begin(), + page_sizes.end(), + comp_page_sizes.begin(), + [compression_codec](auto page_size) { + return max_compression_output_size(compression_codec, page_size); + }); comp_page_sizes.host_to_device(stream); // Use per-page max compressed size to calculate chunk.compressed_size @@ -973,6 +1000,7 @@ auto init_page_sizes(hostdevice_2dvector& chunks, num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_codec), nullptr, nullptr, stream); @@ -1091,6 +1119,7 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& num_columns, max_page_size_bytes, max_page_size_rows, + page_alignment(compression_), (num_stats_bfr) ? page_stats_mrg.data() : nullptr, (num_stats_bfr > num_pages) ? page_stats_mrg.data() + num_pages : nullptr, stream); @@ -1109,83 +1138,6 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span const> comp_in, - device_span const> comp_out, - device_span comp_stats, - size_t max_page_uncomp_data_size, - rmm::cuda_stream_view stream) -{ - size_t num_comp_pages = comp_in.size(); - try { - size_t temp_size; - nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( - num_comp_pages, max_page_uncomp_data_size, nvcompBatchedSnappyDefaultOpts, &temp_size); - - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, - "Error in getting snappy compression scratch size"); - - // Not needed now but nvcomp API makes no promises about future - rmm::device_buffer scratch(temp_size, stream); - // Analogous to comp_in.srcDevice - rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.srcSize - rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); - // Analogous to comp_in.dstDevice - rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_stat.bytes_written - rmm::device_uvector compressed_bytes_written(num_comp_pages, stream); - // nvcomp does not currently use comp_in.dstSize. Cannot assume that the output will fit in - // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() - - // Prepare the vectors - auto comp_it = - thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin()); - thrust::transform( - rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(auto const& in) { return thrust::make_tuple(in.data(), in.size()); }); - - thrust::transform(rmm::exec_policy(stream), - comp_out.begin(), - comp_out.end(), - compressed_data_ptrs.begin(), - [] __device__(auto const& out) { return out.data(); }); - nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), - uncompressed_data_sizes.data(), - max_page_uncomp_data_size, - num_comp_pages, - scratch.data(), // Not needed rn but future - scratch.size(), - compressed_data_ptrs.data(), - compressed_bytes_written.data(), - nvcompBatchedSnappyDefaultOpts, - stream.value()); - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); - - // nvcomp also doesn't use comp_out.status . It guarantees that given enough output space, - // compression will succeed. - // The other `comp_out` field is `reserved` which is for internal cuIO debugging and can be 0. - thrust::transform(rmm::exec_policy(stream), - compressed_bytes_written.begin(), - compressed_bytes_written.end(), - comp_stats.begin(), - [] __device__(size_t size) { - decompress_status status{}; - status.bytes_written = size; - return status; - }); - return; - } catch (...) { - // If we reach this then there was an error in compressing so set an error status for each page - thrust::for_each(rmm::exec_policy(stream), - comp_stats.begin(), - comp_stats.end(), - [] __device__(decompress_status & stat) { stat.status = 1; }); - }; -} - void writer::impl::encode_pages(hostdevice_2dvector& chunks, device_span pages, size_t max_page_uncomp_data_size, @@ -1209,24 +1161,37 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks rmm::device_uvector> comp_in(max_comp_pages, stream); rmm::device_uvector> comp_out(max_comp_pages, stream); - rmm::device_uvector comp_stats(max_comp_pages, stream); + rmm::device_uvector comp_res(max_comp_pages, stream); + thrust::fill(rmm::exec_policy(stream), + comp_res.begin(), + comp_res.end(), + compression_result{0, compression_status::FAILURE}); - gpu::EncodePages(batch_pages, comp_in, comp_out, comp_stats, stream); + gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream); switch (compression_) { case parquet::Compression::SNAPPY: - if (nvcomp_integration::is_stable_enabled()) { - snappy_compress(comp_in, comp_out, comp_stats, max_page_uncomp_data_size, stream); + if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) { + nvcomp::batched_compress( + nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream); } else { - gpu_snap(comp_in, comp_out, comp_stats, stream); + gpu_snap(comp_in, comp_out, comp_res, stream); + } + break; + case parquet::Compression::ZSTD: + if (nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) { + nvcomp::batched_compress( + nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream); } break; - default: break; + case parquet::Compression::UNCOMPRESSED: break; + default: CUDF_FAIL("invalid compression type"); } + // TBD: Not clear if the official spec actually allows dynamically turning off compression at the // chunk-level auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch); DecideCompression(d_chunks_in_batch.flat_view(), stream); - EncodePageHeaders(batch_pages, comp_stats, batch_pages_stats, chunk_stats, stream); + EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream); GatherPages(d_chunks_in_batch.flat_view(), pages, stream); if (column_stats != nullptr) { @@ -1274,6 +1239,18 @@ size_t writer::impl::column_index_buffer_size(gpu::EncColumnChunk* ck) const return ck->ck_stat_size * ck->num_pages + column_index_truncate_length + padding; } +size_t max_page_bytes(Compression compression, size_t max_page_size_bytes) +{ + if (compression == parquet::Compression::UNCOMPRESSED) { return max_page_size_bytes; } + + auto const ncomp_type = to_nvcomp_compression_type(compression); + auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type) + ? nvcomp::compress_max_allowed_chunk_size(ncomp_type) + : std::nullopt; + + return std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes); +} + writer::impl::impl(std::vector> sinks, parquet_writer_options const& options, SingleWriteMode mode, @@ -1281,11 +1258,11 @@ writer::impl::impl(std::vector> sinks, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + compression_(to_parquet_compression(options.get_compression())), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, - max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_bytes(max_page_bytes(compression_, options.get_max_page_size_bytes())), max_page_size_rows(options.get_max_page_size_rows()), - compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), column_index_truncate_length(options.get_column_index_truncate_length()), @@ -1306,11 +1283,11 @@ writer::impl::impl(std::vector> sinks, rmm::mr::device_memory_resource* mr) : _mr(mr), stream(stream), + compression_(to_parquet_compression(options.get_compression())), max_row_group_size{options.get_row_group_size_bytes()}, max_row_group_rows{options.get_row_group_size_rows()}, - max_page_size_bytes(options.get_max_page_size_bytes()), + max_page_size_bytes(max_page_bytes(compression_, options.get_max_page_size_bytes())), max_page_size_rows(options.get_max_page_size_rows()), - compression_(to_parquet_compression(options.get_compression())), stats_granularity_(options.get_stats_level()), int96_timestamps(options.is_enabled_int96_timestamps()), column_index_truncate_length(options.get_column_index_truncate_length()), @@ -1405,13 +1382,15 @@ void writer::impl::write(table_view const& table, std::vector co // iteratively reduce this value if the largest fragment exceeds the max page size limit (we // ideally want the page size to be below 1MB so as to have enough pages to get good // compression/decompression performance). - using cudf::io::parquet::gpu::max_page_fragment_size; + auto max_page_fragment_size = + (cudf::io::parquet::gpu::max_page_fragment_size * max_page_size_bytes) / + default_max_page_size_bytes; std::vector num_frag_in_part; std::transform(partitions.begin(), partitions.end(), std::back_inserter(num_frag_in_part), - [](auto const& part) { + [max_page_fragment_size](auto const& part) { return util::div_rounding_up_unsafe(part.num_rows, max_page_fragment_size); }); @@ -1561,8 +1540,8 @@ void writer::impl::write(table_view const& table, std::vector co } // Build chunk dictionaries and count pages - hostdevice_vector comp_page_sizes = - init_page_sizes(chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, stream); + hostdevice_vector comp_page_sizes = init_page_sizes( + chunks, col_desc, num_columns, max_page_size_bytes, max_page_size_rows, compression_, stream); // Get the maximum page size across all chunks size_type max_page_uncomp_data_size = diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index c6309488d6b..cac75a5dcd9 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -208,11 +208,11 @@ class writer::impl { // Cuda stream to be used rmm::cuda_stream_view stream; + Compression compression_ = Compression::UNCOMPRESSED; size_t max_row_group_size = default_row_group_size_bytes; size_type max_row_group_rows = default_row_group_size_rows; size_t max_page_size_bytes = default_max_page_size_bytes; size_type max_page_size_rows = default_max_page_size_rows; - Compression compression_ = Compression::UNCOMPRESSED; statistics_freq stats_granularity_ = statistics_freq::STATISTICS_NONE; bool int96_timestamps = false; size_type column_index_truncate_length = default_column_index_truncate_length; diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index 134f262cb13..c51a7854e25 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -58,7 +58,7 @@ struct DecompressTest : public cudf::test::BaseFixture { inf_out[0] = dst; inf_out.host_to_device(stream); - hostdevice_vector inf_stat(1, stream); + hostdevice_vector inf_stat(1, stream); inf_stat[0] = {}; inf_stat.host_to_device(stream); @@ -66,7 +66,7 @@ struct DecompressTest : public cudf::test::BaseFixture { cudaMemcpyAsync( decompressed->data(), dst.data(), dst.size(), cudaMemcpyDeviceToHost, stream.value()); inf_stat.device_to_host(stream, true); - ASSERT_EQ(inf_stat[0].status, 0); + ASSERT_EQ(inf_stat[0].status, cudf::io::compression_status::SUCCESS); } }; @@ -76,7 +76,7 @@ struct DecompressTest : public cudf::test::BaseFixture { struct GzipDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { cudf::io::gpuinflate(d_inf_in, d_inf_out, @@ -92,7 +92,7 @@ struct GzipDecompressTest : public DecompressTest { struct SnappyDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { cudf::io::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, cudf::default_stream_value); } @@ -104,7 +104,7 @@ struct SnappyDecompressTest : public DecompressTest { struct BrotliDecompressTest : public DecompressTest { void dispatch(device_span> d_inf_in, device_span> d_inf_out, - device_span d_inf_stat) + device_span d_inf_stat) { rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), cudf::default_stream_value}; diff --git a/java/src/main/java/ai/rapids/cudf/CompressionType.java b/java/src/main/java/ai/rapids/cudf/CompressionType.java index 48f980d7f71..96edf1a8add 100644 --- a/java/src/main/java/ai/rapids/cudf/CompressionType.java +++ b/java/src/main/java/ai/rapids/cudf/CompressionType.java @@ -1,6 +1,6 @@ /* * - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,11 +44,21 @@ public enum CompressionType { ZIP(6), /** XZ format using LZMA(2) algorithm */ - XZ(7); + XZ(7), + + /** ZLIB format, using DEFLATE algorithm */ + ZLIB(8), + + /** LZ4 format, using LZ77 */ + LZ4(9), + + /** Lempel–Ziv–Oberhumer format */ + LZO(10), + + /** Zstandard format */ + ZSTD(11); final int nativeId; - CompressionType(int nativeId) { - this.nativeId = nativeId; - } + CompressionType(int nativeId) { this.nativeId = nativeId; } } diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 66b841fd273..1c9f388873c 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -163,6 +163,8 @@ cdef compression_type _get_comp_type(object compression): return compression_type.SNAPPY elif compression == "ZLIB": return compression_type.ZLIB + elif compression == "ZSTD": + return compression_type.ZSTD else: raise ValueError(f"Unsupported `compression` type {compression}") diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 84e0bba7133..3c8e78bd87a 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -670,6 +670,8 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): return cudf_io_types.compression_type.NONE elif compression == "snappy": return cudf_io_types.compression_type.SNAPPY + elif compression == "ZSTD": + return cudf_io_types.compression_type.ZSTD else: raise ValueError("Unsupported `compression` type") diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index db52e51bd33..c2188003531 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1745,19 +1745,18 @@ def test_writer_protobuf_large_rowindexentry(): assert_frame_equal(df, got) -def test_orc_writer_zlib_compression(list_struct_buff): +@pytest.mark.parametrize("compression", ["ZLIB", "ZSTD"]) +def test_orc_writer_nvcomp(list_struct_buff, compression): expected = cudf.read_orc(list_struct_buff) + + buff = BytesIO() try: - # save with ZLIB compression - buff = BytesIO() - expected.to_orc(buff, compression="ZLIB") - got = cudf.read_orc(buff) + expected.to_orc(buff, compression=compression) + except RuntimeError: + pytest.mark.xfail(reason="Newer nvCOMP version is required") + else: + got = pd.read_orc(buff) assert_eq(expected, got) - except RuntimeError as e: - if "Unsupported compression type" in str(e): - pytest.mark.xfail(reason="nvcomp build doesn't have deflate") - else: - raise e @pytest.mark.parametrize("index", [True, False, None]) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 84d89618909..022f7cdd6f7 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2568,3 +2568,23 @@ def test_parquet_nested_struct_list(): actual = cudf.read_parquet(buffer) assert_eq(expected, actual) assert_eq(actual.a.dtype, df.a.dtype) + + +def test_parquet_writer_zstd(): + size = 12345 + expected = cudf.DataFrame( + { + "a": np.arange(0, stop=size, dtype="float64"), + "b": np.random.choice(list("abcd"), size=size), + "c": np.random.choice(np.arange(4), size=size), + } + ) + + buff = BytesIO() + try: + expected.to_orc(buff, compression="ZSTD") + except RuntimeError: + pytest.mark.xfail(reason="Newer nvCOMP version is required") + else: + got = pd.read_orc(buff) + assert_eq(expected, got) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index a3bb81c6c24..19815c7c506 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -208,7 +208,7 @@ File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. Use list of str with partition_offsets to write parts of the dataframe to different files. -compression : {'snappy', None}, default 'snappy' +compression : {'snappy', 'ZSTD', None}, default 'snappy' Name of the compression to use. Use ``None`` for no compression. index : bool, default None If ``True``, include the dataframe's index(es) in the file output. If @@ -429,7 +429,7 @@ ---------- fname : str File path or object where the ORC dataset will be stored. -compression : {{ 'snappy', 'ZLIB', None }}, default None +compression : {{ 'snappy', 'ZLIB', 'ZSTD', None }}, default None Name of the compression to use. Use None for no compression. enable_statistics: boolean, default True Enable writing column statistics.