diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 004812615eb..30687331c15 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -48,7 +48,8 @@ struct CompressedStreamInfo { copyctl(nullptr), num_compressed_blocks(0), num_uncompressed_blocks(0), - max_uncompressed_size(0) + max_uncompressed_size(0), + max_uncompressed_block_size(0) { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data @@ -60,9 +61,10 @@ struct CompressedStreamInfo { copyctl; // [in] base ptr to copy structure to be filled for uncompressed blocks uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) - uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of - // uncompressed blocks(out) - uint64_t max_uncompressed_size; // [out] maximum uncompressed data size + uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of + // uncompressed blocks(out) + uint64_t max_uncompressed_size; // [out] maximum uncompressed data size of stream + uint32_t max_uncompressed_block_size; // [out] maximum uncompressed size of any block in stream }; enum StreamIndexType { diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index a8e44bf3834..b15c5a0941d 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -38,6 +38,8 @@ #include #include +#include + #include #include @@ -549,6 +551,68 @@ class aggregate_orc_metadata { } }; +void snappy_decompress(device_span comp_in, + device_span comp_stat, + size_t max_uncomp_page_size, + rmm::cuda_stream_view stream) +{ + size_t num_blocks = comp_in.size(); + size_t temp_size; + + auto status = + nvcompBatchedSnappyDecompressGetTempSize(num_blocks, max_uncomp_page_size, &temp_size); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, + "Unable to get scratch size for snappy decompression"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector compressed_data_ptrs(num_blocks, stream); + rmm::device_uvector compressed_data_sizes(num_blocks, stream); + rmm::device_uvector uncompressed_data_ptrs(num_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_blocks, stream); + + rmm::device_uvector actual_uncompressed_data_sizes(num_blocks, stream); + rmm::device_uvector statuses(num_blocks, stream); + + // Prepare the vectors + auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), + compressed_data_sizes.begin(), + uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.data()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); + }); + + status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(), + compressed_data_sizes.data(), + uncompressed_data_sizes.data(), + actual_uncompressed_data_sizes.data(), + num_blocks, + scratch.data(), + scratch.size(), + uncompressed_data_ptrs.data(), + statuses.data(), + stream.value()); + CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression"); + + CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream), + statuses.begin(), + statuses.end(), + thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)), + "Error during snappy decompression"); + thrust::for_each_n( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + num_blocks, + [=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) { + comp_stat[i].bytes_written = actual_uncomp_sizes[i]; + comp_stat[i].status = 0; + }); +} + rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, @@ -592,9 +656,10 @@ rmm::device_buffer reader::impl::decompress_stripe_data( rmm::device_uvector inflate_out(num_compressed_blocks, stream); // Parse again to populate the decompression input/output buffers - size_t decomp_offset = 0; - uint32_t start_pos = 0; - uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; + size_t decomp_offset = 0; + uint32_t max_uncomp_block_size = 0; + uint32_t start_pos = 0; + uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks; for (size_t i = 0; i < compinfo.size(); ++i) { auto dst_base = static_cast(decomp_data.data()); compinfo[i].uncompressed_data = dst_base + decomp_offset; @@ -606,6 +671,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data( decomp_offset += compinfo[i].max_uncompressed_size; start_pos += compinfo[i].num_compressed_blocks; start_pos_uncomp += compinfo[i].num_uncompressed_blocks; + max_uncomp_block_size = + std::max(max_uncomp_block_size, compinfo[i].max_uncompressed_block_size); } compinfo.host_to_device(stream); gpu::ParseCompressedStripeData(compinfo.device_ptr(), @@ -616,13 +683,24 @@ rmm::device_buffer reader::impl::decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; switch (decompressor->GetKind()) { case orc::ZLIB: CUDA_TRY( gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream)); break; case orc::SNAPPY: - CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + if (use_nvcomp) { + device_span inflate_in_view{inflate_in.data(), + num_compressed_blocks}; + device_span inflate_out_view{inflate_out.data(), + num_compressed_blocks}; + snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream); + } else { + CUDA_TRY( + gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + } break; default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break; } diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index abef4f57894..94d8de6561b 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -45,12 +45,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat __syncthreads(); if (strm_id < num_streams) { // Walk through the compressed blocks - const uint8_t* cur = s->info.compressed_data; - const uint8_t* end = cur + s->info.compressed_data_size; - uint8_t* uncompressed = s->info.uncompressed_data; - size_t max_uncompressed_size = 0; - uint32_t num_compressed_blocks = 0; - uint32_t num_uncompressed_blocks = 0; + const uint8_t* cur = s->info.compressed_data; + const uint8_t* end = cur + s->info.compressed_data_size; + uint8_t* uncompressed = s->info.uncompressed_data; + size_t max_uncompressed_size = 0; + uint32_t max_uncompressed_block_size = 0; + uint32_t num_compressed_blocks = 0; + uint32_t num_uncompressed_blocks = 0; while (cur + 3 < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; @@ -60,8 +61,9 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat cur += 3; if (block_len > block_size || cur + block_len > end) { // Fatal - num_compressed_blocks = 0; - max_uncompressed_size = 0; + num_compressed_blocks = 0; + max_uncompressed_size = 0; + max_uncompressed_block_size = 0; break; } // TBD: For some codecs like snappy, it wouldn't be too difficult to get the actual @@ -102,12 +104,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat if (init_ctl && lane_id == 0) *init_ctl = s->ctl; cur += block_len; max_uncompressed_size += uncompressed_size; + max_uncompressed_block_size = max(max_uncompressed_block_size, uncompressed_size); } __syncwarp(); if (!lane_id) { - s->info.num_compressed_blocks = num_compressed_blocks; - s->info.num_uncompressed_blocks = num_uncompressed_blocks; - s->info.max_uncompressed_size = max_uncompressed_size; + s->info.num_compressed_blocks = num_compressed_blocks; + s->info.num_uncompressed_blocks = num_uncompressed_blocks; + s->info.max_uncompressed_size = max_uncompressed_size; + s->info.max_uncompressed_block_size = max_uncompressed_block_size; } }