Skip to content

Commit

Permalink
Refactor host decompression in ORC reader (#10764)
Browse files Browse the repository at this point in the history
Another prequel to ORC Zstandard support.
Irons out the various decompression interfaces in cuIO:

- Removes redundant compression type `enum`.
- Replaces `HostDecompressor` classes with free functions.
- API improvements - `span` use, replace error codes/invalid return values with `CUDF_EXPECTS`.
- Use `uint8_t` consistently as the raw data type.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - https://github.com/nvdbaranec
  - Bradley Dice (https://github.com/bdice)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #10764
  • Loading branch information
vuule authored May 16, 2022
1 parent 42438de commit 712e77f
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 353 deletions.
6 changes: 5 additions & 1 deletion cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ enum class compression_type {
BZIP2, ///< BZIP2 format, using Burrows-Wheeler transform
BROTLI, ///< BROTLI format, using LZ77 + Huffman + 2nd order context modeling
ZIP, ///< ZIP format, using DEFLATE algorithm
XZ ///< XZ format, using LZMA(2) algorithm
XZ, ///< XZ format, using LZMA(2) algorithm
ZLIB, ///< ZLIB format, using DEFLATE algorithm
LZ4, ///< LZ4 format, using LZ77
LZO, ///< Lempel–Ziv–Oberhumer format
ZSTD ///< Zstandard format
};

/**
Expand Down
41 changes: 13 additions & 28 deletions cpp/src/io/comp/io_uncomp.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2021, NVIDIA CORPORATION.
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,35 +27,20 @@ using cudf::host_span;

namespace cudf {
namespace io {
enum {
IO_UNCOMP_STREAM_TYPE_INFER = 0,
IO_UNCOMP_STREAM_TYPE_GZIP = 1,
IO_UNCOMP_STREAM_TYPE_ZIP = 2,
IO_UNCOMP_STREAM_TYPE_BZIP2 = 3,
IO_UNCOMP_STREAM_TYPE_XZ = 4,
IO_UNCOMP_STREAM_TYPE_INFLATE = 5,
IO_UNCOMP_STREAM_TYPE_SNAPPY = 6,
IO_UNCOMP_STREAM_TYPE_BROTLI = 7,
IO_UNCOMP_STREAM_TYPE_LZ4 = 8,
IO_UNCOMP_STREAM_TYPE_LZO = 9,
IO_UNCOMP_STREAM_TYPE_ZSTD = 10,
};

std::vector<char> io_uncompress_single_h2d(void const* src, size_t src_size, int stream_type);

std::vector<char> get_uncompressed_data(host_span<char const> data, compression_type compression);

class HostDecompressor {
public:
virtual size_t Decompress(uint8_t* dstBytes,
size_t dstLen,
uint8_t const* srcBytes,
size_t srcLen) = 0;
virtual ~HostDecompressor() {}
/**
* @brief Decompresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
*
* @return Vector containing the Decompressed output
*/
std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src);

public:
static std::unique_ptr<HostDecompressor> Create(int stream_type);
};
size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst);

/**
* @brief GZIP header flags
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/io/comp/nvcomp_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,37 @@
namespace cudf::io::nvcomp {

template <typename... Args>
auto batched_decompress_get_temp_size(compression_type type, Args&&... args)
auto batched_decompress_get_temp_size(compression_type compression, Args&&... args)
{
switch (type) {
switch (compression) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressGetTempSize(std::forward<Args>(args)...);
default: CUDF_FAIL("Unsupported compression type");
}
};

template <typename... Args>
auto batched_decompress_async(compression_type type, Args&&... args)
auto batched_decompress_async(compression_type compression, Args&&... args)
{
switch (type) {
switch (compression) {
case compression_type::SNAPPY:
return nvcompBatchedSnappyDecompressAsync(std::forward<Args>(args)...);
default: CUDF_FAIL("Unsupported compression type");
}
};

size_t get_temp_size(compression_type type, size_t num_chunks, size_t max_uncomp_chunk_size)
size_t get_temp_size(compression_type compression, size_t num_chunks, size_t max_uncomp_chunk_size)
{
size_t temp_size = 0;
nvcompStatus_t nvcomp_status =
batched_decompress_get_temp_size(type, num_chunks, max_uncomp_chunk_size, &temp_size);
batched_decompress_get_temp_size(compression, num_chunks, max_uncomp_chunk_size, &temp_size);
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"Unable to get scratch size for decompression");

return temp_size;
}

void batched_decompress(compression_type type,
void batched_decompress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
Expand All @@ -67,8 +67,8 @@ void batched_decompress(compression_type type,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_chunks, stream);
rmm::device_uvector<nvcompStatus_t> nvcomp_statuses(num_chunks, stream);
// Temporary space required for decompression
rmm::device_buffer scratch(get_temp_size(type, num_chunks, max_uncomp_chunk_size), stream);
auto const nvcomp_status = batched_decompress_async(type,
rmm::device_buffer scratch(get_temp_size(compression, num_chunks, max_uncomp_chunk_size), stream);
auto const nvcomp_status = batched_decompress_async(compression,
nvcomp_args.compressed_data_ptrs.data(),
nvcomp_args.compressed_data_sizes.data(),
nvcomp_args.uncompressed_data_sizes.data(),
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ enum class compression_type { SNAPPY };
* @param[in] max_uncomp_page_size maximum size of uncompressed block
* @param[in] stream CUDA stream to use
*/
void batched_decompress(compression_type type,
void batched_decompress(compression_type compression,
device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
Expand Down
Loading

0 comments on commit 712e77f

Please sign in to comment.