Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use nvcomp's snappy compressor in ORC writer #9242

Merged
merged 31 commits into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
db23741
Initial changes to get nvcomp integrated
devavret May 7, 2021
a5f3363
Using nvcomp provided max compressed buffer size
devavret May 12, 2021
61018aa
Recover from error in nvcomp compressing and encode uncompressed.
devavret May 12, 2021
64d7d1c
review changes
devavret May 13, 2021
27764e7
Replace accidental vector with uvector.
devavret May 14, 2021
95a57ec
Provide the actual max uncomp page size to nvcomp's temp size estimat…
devavret May 14, 2021
cc9500a
cmake changes requested in review
devavret May 14, 2021
7989b9c
Merge branch 'branch-21.10' into parquet-writer-nvcomp-snappy
devavret Aug 19, 2021
f90409c
Merge branch 'branch-21.10' into parquet-writer-nvcomp-snappy
devavret Aug 19, 2021
40ebd1e
Update parquet writer to use nvcomp 2.1
devavret Aug 24, 2021
4a2cb24
One more cmake change related to updating nvcomp
devavret Aug 24, 2021
6019b0f
Update nvcomp to version with fix for snappy decompressor
devavret Aug 31, 2021
140d3d0
Fix allocation size bug
devavret Sep 2, 2021
05f5343
Merge branch 'branch-21.10' into parquet-writer-nvcomp-snappy
devavret Sep 3, 2021
62d92b4
Update cmake to find nvcomp in new manner
devavret Sep 3, 2021
3c73be3
Make nvcomp private in cmake and update get_nvcomp
devavret Sep 7, 2021
e0a013d
Add an env var flip switch to choose b/w nvcomp and inbuilt compressor
devavret Sep 8, 2021
7501b11
Merge branch 'branch-21.10' into parquet-writer-nvcomp-snappy
devavret Sep 8, 2021
bfa1366
Static linking nvcomp into libcudf
devavret Sep 8, 2021
203cf15
Review changes
devavret Sep 9, 2021
99e4f80
Working orc reader with nvcomp
devavret Sep 10, 2021
6721fb8
Merge changes from nvcomp -fPIC
devavret Sep 13, 2021
5391e13
Merge branch 'parquet-writer-nvcomp-snappy' into orc-reader-nvcomp-sn…
devavret Sep 13, 2021
354e229
Merge branch 'branch-21.10' into orc-reader-nvcomp-snappy
devavret Sep 15, 2021
66d49e8
Working ORC writer with nvcomp
devavret Sep 16, 2021
4e78529
Small cleanups. Device span instead of pointers
devavret Sep 16, 2021
8ed68ef
Here you go: range for loop
devavret Sep 16, 2021
8b471de
Add switch to control usage of nvcomp
devavret Sep 16, 2021
34a42c3
Merge branch 'branch-21.10' into orc-writer-nvcomp-snappy
devavret Sep 20, 2021
0569281
Replace magic number 3 with BLOCK_HEADER_SIZE
devavret Sep 21, 2021
11e20e7
Copyright updates
devavret Sep 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace orc {

// ORC rows are divided into groups and assigned indexes for faster seeking
static constexpr uint32_t default_row_index_stride = 10000;
static constexpr uint32_t BLOCK_HEADER_SIZE = 3;
devavret marked this conversation as resolved.
Show resolved Hide resolved

enum CompressionKind : uint8_t {
NONE = 0,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
* @param[in] num_compressed_blocks Total number of compressed blocks
* @param[in] compression Type of compression
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[out] comp_in Per-block compression input parameters
Expand All @@ -365,10 +366,11 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
uint32_t num_compressed_blocks,
CompressionKind compression,
uint32_t comp_blk_size,
uint32_t max_comp_blk_size,
device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
Comment on lines +372 to +373
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it mildly worse to pass a span here if you don't ever use the size? In the sense of having to hypothetically push another argument on the stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it, the trade-off is that with span it's clear that the parameter is an array, not a pointer to a single object. Now, whether this is a good trade-off is up for debate :D

rmm::cuda_stream_view stream);

/**
Expand Down
96 changes: 83 additions & 13 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

#include <cub/cub.cuh>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <nvcomp/snappy.h>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -1102,15 +1105,17 @@ __global__ void __launch_bounds__(1024)
* @param[out] comp_out Per-block compression status
* @param[in] compressed_bfr Compression output buffer
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
*/
// blockDim {256,1,1}
__global__ void __launch_bounds__(256)
gpuInitCompressionBlocks(device_2dspan<StripeStream const> strm_desc,
device_2dspan<encoder_chunk_streams> streams, // const?
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
uint8_t* compressed_bfr,
uint32_t comp_blk_size)
uint32_t comp_blk_size,
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t* volatile uncomp_base_g;
Expand All @@ -1135,8 +1140,8 @@ __global__ void __launch_bounds__(256)
uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size));
blk_in->srcDevice = src + b * comp_blk_size;
blk_in->srcSize = blk_size;
blk_in->dstDevice = dst + b * (3 + comp_blk_size) + 3; // reserve 3 bytes for block header
blk_in->dstSize = blk_size;
blk_in->dstDevice = dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE;
blk_in->dstSize = max_comp_blk_size;
blk_out->bytes_written = blk_size;
blk_out->status = 1;
blk_out->reserved = 0;
Expand All @@ -1153,14 +1158,16 @@ __global__ void __launch_bounds__(256)
* @param[in] comp_out Per-block compression status
* @param[in] compressed_bfr Compression output buffer
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
*/
// blockDim {1024,1,1}
__global__ void __launch_bounds__(1024)
gpuCompactCompressedBlocks(device_2dspan<StripeStream> strm_desc,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
uint8_t* compressed_bfr,
uint32_t comp_blk_size)
uint32_t comp_blk_size,
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ const uint8_t* volatile comp_src_g;
Expand Down Expand Up @@ -1271,20 +1278,83 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
uint32_t num_compressed_blocks,
CompressionKind compression,
uint32_t comp_blk_size,
uint32_t max_comp_blk_size,
device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
rmm::cuda_stream_view stream)
{
dim3 dim_block_init(256, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuInitCompressionBlocks<<<dim_grid, dim_block_init, 0, stream.value()>>>(
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size);
if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); }
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size);
if (compression == SNAPPY) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP");
bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0;
if (use_nvcomp) {
try {
size_t temp_size;
nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize(
num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size);

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"Error in getting snappy compression scratch size");

rmm::device_buffer scratch(temp_size, stream);
rmm::device_uvector<void const*> uncompressed_data_ptrs(num_compressed_blocks, stream);
rmm::device_uvector<size_t> uncompressed_data_sizes(num_compressed_blocks, stream);
rmm::device_uvector<void*> compressed_data_ptrs(num_compressed_blocks, stream);
rmm::device_uvector<size_t> compressed_bytes_written(num_compressed_blocks, stream);

auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(),
uncompressed_data_sizes.begin(),
compressed_data_ptrs.begin());
thrust::transform(rmm::exec_policy(stream),
comp_in.begin(),
comp_in.end(),
comp_it,
[] __device__(gpu_inflate_input_s in) {
return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice);
});
nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(),
uncompressed_data_sizes.data(),
max_comp_blk_size,
num_compressed_blocks,
scratch.data(),
scratch.size(),
compressed_data_ptrs.data(),
compressed_bytes_written.data(),
nvcompBatchedSnappyDefaultOpts,
stream.value());

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression");

thrust::transform(rmm::exec_policy(stream),
compressed_bytes_written.begin(),
compressed_bytes_written.end(),
comp_out.begin(),
[] __device__(size_t size) {
gpu_inflate_status_s status{};
status.bytes_written = size;
return status;
Comment on lines +1333 to +1340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be replaced with a thrust::transform_output_iterator in the call to nvcompBatchedSnappyCompressAsync, basically to safe allocating and materializing the compressed_bytes_written? [1]

[1] https://thrust.github.io/doc/classthrust_1_1transform__output__iterator.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvcompBatchedSnappyCompressAsync is a C API and doesn't take iterators. 😞

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Too bad! Thanks for clarifying 👍

});
} catch (...) {
// If we reach this then there was an error in compressing so set an error status for each
// block
thrust::for_each(rmm::exec_policy(stream),
comp_out.begin(),
comp_out.end(),
[] __device__(gpu_inflate_status_s & stat) { stat.status = 1; });
};

} else {
gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream);
}
}
dim3 dim_block_compact(1024, 1);
gpuCompactCompressedBlocks<<<dim_grid, dim_block_compact, 0, stream.value()>>>(
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size);
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size);
}

} // namespace gpu
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat
uint32_t max_uncompressed_block_size = 0;
uint32_t num_compressed_blocks = 0;
uint32_t num_uncompressed_blocks = 0;
while (cur + 3 < end) {
while (cur + BLOCK_HEADER_SIZE < end) {
devavret marked this conversation as resolved.
Show resolved Hide resolved
uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0);
uint32_t is_uncompressed = block_len & 1;
uint32_t uncompressed_size;
gpu_inflate_input_s* init_ctl = nullptr;
block_len >>= 1;
cur += 3;
cur += BLOCK_HEADER_SIZE;
if (block_len > block_size || cur + block_len > end) {
// Fatal
num_compressed_blocks = 0;
Expand Down Expand Up @@ -145,12 +145,12 @@ extern "C" __global__ void __launch_bounds__(128, 8)
uint32_t num_compressed_blocks = 0;
uint32_t max_compressed_blocks = s->info.num_compressed_blocks;

while (cur + 3 < end) {
while (cur + BLOCK_HEADER_SIZE < end) {
uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0);
uint32_t is_uncompressed = block_len & 1;
uint32_t uncompressed_size_est, uncompressed_size_actual;
block_len >>= 1;
cur += 3;
cur += BLOCK_HEADER_SIZE;
if (cur + block_len > end) { break; }
if (is_uncompressed) {
uncompressed_size_est = block_len;
Expand Down Expand Up @@ -367,9 +367,11 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s,
for (;;) {
uint32_t block_len, is_uncompressed;

if (cur + 3 > end || cur + 3 >= start + compressed_offset) { break; }
if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) {
break;
}
block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16);
cur += 3;
cur += BLOCK_HEADER_SIZE;
is_uncompressed = block_len & 1;
block_len >>= 1;
cur += block_len;
Expand Down
51 changes: 28 additions & 23 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <rmm/device_buffer.hpp>
#include <rmm/device_uvector.hpp>

#include <nvcomp/snappy.h>

#include <algorithm>
#include <cstring>
#include <numeric>
Expand Down Expand Up @@ -999,10 +1001,10 @@ void writer::impl::write_index_stream(int32_t stripe_id,
record.pos += stream.lengths[type];
while ((record.pos >= 0) && (record.blk_pos >= 0) &&
(static_cast<size_t>(record.pos) >= compression_blocksize_) &&
(record.comp_pos + 3 + comp_out[record.blk_pos].bytes_written <
(record.comp_pos + BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written <
static_cast<size_t>(record.comp_size))) {
record.pos -= compression_blocksize_;
record.comp_pos += 3 + comp_out[record.blk_pos].bytes_written;
record.comp_pos += BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written;
record.blk_pos += 1;
}
}
Expand Down Expand Up @@ -1472,29 +1474,31 @@ void writer::impl::write(table_view const& table)
}

// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
auto stream_output = [&]() {
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
vuule marked this conversation as resolved.
Show resolved Hide resolved
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;

for (size_t stripe_id = 0; stripe_id < segmentation.num_stripes(); stripe_id++) {
for (size_t i = 0; i < num_data_streams; i++) { // TODO range for (at least)
gpu::StripeStream* ss = &strm_descs[stripe_id][i];
if (!out_sink_->is_device_write_preferred(ss->stream_size)) { all_device_write = false; }
size_t stream_size = ss->stream_size;
if (compression_kind_ != NONE) {
ss->first_block = num_compressed_blocks;
ss->bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * 3;
num_compressed_blocks += num_blocks;
compressed_bfr_size += stream_size;
}
max_stream_size = std::max(max_stream_size, stream_size);
for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
}
max_stream_size = std::max(max_stream_size, stream_size);
}

if (all_device_write) {
Expand All @@ -1519,10 +1523,11 @@ void writer::impl::write(table_view const& table)
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in.device_ptr(),
comp_out.device_ptr(),
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
Expand Down