Skip to content

Commit

Permalink
Use new nvCOMP API to optimize the decompression temp memory size (#1…
Browse files Browse the repository at this point in the history
…1064)

nvCOMP 2.3.1 introduces expanded API to determine the amount of temporary memory required to decompress a set of chunks. New API also takes the maximum total size of the decompressed data for all chunks.
This PR expands the nvCOMP adapter to use this API when available (nvCOMP version >= 2.3.1). With older versions the older API is still used and there is no behavior change.
ORC and Parquet reader now pass the total decompress data size to the nvCOMP adapter decompression calls.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Devavret Makkar (https://github.com/devavret)
  - Jim Brennan (https://github.com/jbrennan333)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Bradley Dice (https://github.com/bdice)

URL: #11064
  • Loading branch information
vuule authored Jun 16, 2022
1 parent f5faa99 commit f2d471e
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 14 deletions.
49 changes: 44 additions & 5 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,39 @@
#define NVCOMP_HAS_DEFLATE 0
#endif

#if NVCOMP_MAJOR_VERSION > 2 or (NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION > 3) or \
(NVCOMP_MAJOR_VERSION == 2 and NVCOMP_MINOR_VERSION == 3 and NVCOMP_PATCH_VERSION >= 1)
#define NVCOMP_HAS_TEMPSIZE_EX 1
#else
#define NVCOMP_HAS_TEMPSIZE_EX 0
#endif

namespace cudf::io::nvcomp {

#if NVCOMP_HAS_TEMPSIZE_EX
// Dispatcher for nvcompBatched<format>DecompressGetTempSizeEx
template <typename... Args>
auto batched_decompress_get_temp_size_ex(compression_type compression, Args&&... args)
{
switch (compression) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressGetTempSizeEx(std::forward<Args>(args)...);
case compression_type::ZSTD:
#if NVCOMP_HAS_ZSTD
return nvcompBatchedZstdDecompressGetTempSizeEx(std::forward<Args>(args)...);
#else
CUDF_FAIL("Unsupported compression type");
#endif
case compression_type::DEFLATE:
#if NVCOMP_HAS_DEFLATE
return nvcompBatchedDeflateDecompressGetTempSizeEx(std::forward<Args>(args)...);
#else
CUDF_FAIL("Unsupported compression type");
#endif
default: CUDF_FAIL("Unsupported compression type");
}
}
#else
// Dispatcher for nvcompBatched<format>DecompressGetTempSize
template <typename... Args>
auto batched_decompress_get_temp_size(compression_type compression, Args&&... args)
Expand All @@ -61,6 +92,7 @@ auto batched_decompress_get_temp_size(compression_type compression, Args&&... ar
default: CUDF_FAIL("Unsupported compression type");
}
}
#endif

// Dispatcher for nvcompBatched<format>DecompressAsync
template <typename... Args>
Expand All @@ -87,11 +119,17 @@ auto batched_decompress_async(compression_type compression, Args&&... args)

size_t batched_decompress_temp_size(compression_type compression,
size_t num_chunks,
size_t max_uncomp_chunk_size)
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size)
{
size_t temp_size = 0;
nvcompStatus_t nvcomp_status =
#if NVCOMP_HAS_TEMPSIZE_EX
auto const nvcomp_status = batched_decompress_get_temp_size_ex(
compression, num_chunks, max_uncomp_chunk_size, &temp_size, max_total_uncomp_size);
#else
auto const nvcomp_status =
batched_decompress_get_temp_size(compression, num_chunks, max_uncomp_chunk_size, &temp_size);
#endif
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"Unable to get scratch size for decompression");

Expand All @@ -103,6 +141,7 @@ void batched_decompress(compression_type compression,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size,
rmm::cuda_stream_view stream)
{
// TODO Consolidate config use to a common location
Expand All @@ -123,8 +162,9 @@ void batched_decompress(compression_type compression,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_chunks, stream);
rmm::device_uvector<nvcompStatus_t> nvcomp_statuses(num_chunks, stream);
// Temporary space required for decompression
rmm::device_buffer scratch(
batched_decompress_temp_size(compression, num_chunks, max_uncomp_chunk_size), stream);
auto const temp_size = batched_decompress_temp_size(
compression, num_chunks, max_uncomp_chunk_size, max_total_uncomp_size);
rmm::device_buffer scratch(temp_size, stream);
auto const nvcomp_status = batched_decompress_async(compression,
nvcomp_args.input_data_ptrs.data(),
nvcomp_args.input_data_sizes.data(),
Expand Down Expand Up @@ -153,7 +193,6 @@ auto batched_compress_temp_size(compression_type compression,
nvcomp_status = nvcompBatchedSnappyCompressGetTempSize(
batch_size, max_uncompressed_chunk_bytes, nvcompBatchedSnappyDefaultOpts, &temp_size);
break;

case compression_type::DEFLATE:
#if NVCOMP_HAS_DEFLATE
nvcomp_status = nvcompBatchedDeflateCompressGetTempSize(
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ enum class compression_type { SNAPPY, ZSTD, DEFLATE };
* @param[out] outputs List of output buffers
* @param[out] statuses List of output status structures
* @param[in] max_uncomp_chunk_size maximum size of uncompressed chunk
* @param[in] max_total_uncomp_size maximum total size of uncompressed data
* @param[in] stream CUDA stream to use
*/
void batched_decompress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
size_t max_uncomp_chunk_size,
size_t max_total_uncomp_size,
rmm::cuda_stream_view stream);

/**
Expand Down
9 changes: 7 additions & 2 deletions cpp/src/io/comp/uncomp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,13 @@ size_t decompress_zstd(host_span<uint8_t const> src,

auto hd_stats = hostdevice_vector<decompress_status>(1, stream);
auto const max_uncomp_page_size = dst.size();
nvcomp::batched_decompress(
nvcomp::compression_type::ZSTD, hd_srcs, hd_dsts, hd_stats, max_uncomp_page_size, stream);
nvcomp::batched_decompress(nvcomp::compression_type::ZSTD,
hd_srcs,
hd_dsts,
hd_stats,
max_uncomp_page_size,
max_uncomp_page_size,
stream);

hd_stats.device_to_host(stream, true);
CUDF_EXPECTS(hd_stats[0].status == 0, "ZSTD decompression failed");
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
inflate_out_view,
inflate_stats,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpuinflate(
Expand All @@ -388,6 +389,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
inflate_out_view,
inflate_stats,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpu_unsnap(inflate_in_view, inflate_out_view, inflate_stats, stream);
Expand All @@ -399,6 +401,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
inflate_out_view,
inflate_stats,
max_uncomp_block_size,
total_decomp_size,
stream);
break;
default: CUDF_FAIL("Unexpected decompression dispatch"); break;
Expand Down
18 changes: 11 additions & 7 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1101,15 +1101,16 @@ rmm::device_buffer reader::impl::decompress_page_data(
size_t total_decomp_size = 0;

struct codec_stats {
parquet::Compression compression_type;
size_t num_pages;
int32_t max_decompressed_size;
parquet::Compression compression_type = UNCOMPRESSED;
size_t num_pages = 0;
int32_t max_decompressed_size = 0;
size_t total_decomp_size = 0;
};

std::array codecs{codec_stats{parquet::GZIP, 0, 0},
codec_stats{parquet::SNAPPY, 0, 0},
codec_stats{parquet::BROTLI, 0, 0},
codec_stats{parquet::ZSTD, 0, 0}};
std::array codecs{codec_stats{parquet::GZIP},
codec_stats{parquet::SNAPPY},
codec_stats{parquet::BROTLI},
codec_stats{parquet::ZSTD}};

auto is_codec_supported = [&codecs](int8_t codec) {
if (codec == parquet::UNCOMPRESSED) return true;
Expand All @@ -1128,6 +1129,7 @@ rmm::device_buffer reader::impl::decompress_page_data(
for_each_codec_page(codec.compression_type, [&](size_t page) {
auto page_uncomp_size = pages[page].uncompressed_page_size;
total_decomp_size += page_uncomp_size;
codec.total_decomp_size += page_uncomp_size;
codec.max_decompressed_size = std::max(codec.max_decompressed_size, page_uncomp_size);
codec.num_pages++;
num_comp_pages++;
Expand Down Expand Up @@ -1187,6 +1189,7 @@ rmm::device_buffer reader::impl::decompress_page_data(
d_comp_out,
d_comp_stats_view,
codec.max_decompressed_size,
codec.total_decomp_size,
stream);
} else {
gpu_unsnap(d_comp_in, d_comp_out, d_comp_stats_view, stream);
Expand All @@ -1198,6 +1201,7 @@ rmm::device_buffer reader::impl::decompress_page_data(
d_comp_out,
d_comp_stats_view,
codec.max_decompressed_size,
codec.total_decomp_size,
stream);
break;
case parquet::BROTLI:
Expand Down

0 comments on commit f2d471e

Please sign in to comment.