diff --git a/cpp/src/io/orc/orc_common.h b/cpp/src/io/orc/orc_common.h index ab6788d01f1..eedaa9d4fc2 100644 --- a/cpp/src/io/orc/orc_common.h +++ b/cpp/src/io/orc/orc_common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ namespace orc { // ORC rows are divided into groups and assigned indexes for faster seeking static constexpr uint32_t default_row_index_stride = 10000; +static constexpr uint32_t BLOCK_HEADER_SIZE = 3; enum CompressionKind : uint8_t { NONE = 0, diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index 30687331c15..88d7e26b3b6 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -355,6 +355,7 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] num_compressed_blocks Total number of compressed blocks * @param[in] compression Type of compression * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] * @param[out] comp_in Per-block compression input parameters @@ -365,10 +366,11 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index d50d3898c3b..9348d817dad 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -24,6 +24,9 @@ #include #include +#include + +#include namespace cudf { namespace io { @@ -1102,15 +1105,17 @@ __global__ void __launch_bounds__(1024) * @param[out] comp_out Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression */ // blockDim {256,1,1} __global__ void __launch_bounds__(256) gpuInitCompressionBlocks(device_2dspan strm_desc, device_2dspan streams, // const? - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, uint8_t* compressed_bfr, - uint32_t comp_blk_size) + uint32_t comp_blk_size, + uint32_t max_comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ uint8_t* volatile uncomp_base_g; @@ -1135,8 +1140,8 @@ __global__ void __launch_bounds__(256) uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); blk_in->srcDevice = src + b * comp_blk_size; blk_in->srcSize = blk_size; - blk_in->dstDevice = dst + b * (3 + comp_blk_size) + 3; // reserve 3 bytes for block header - blk_in->dstSize = blk_size; + blk_in->dstDevice = dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE; + blk_in->dstSize = max_comp_blk_size; blk_out->bytes_written = blk_size; blk_out->status = 1; blk_out->reserved = 0; @@ -1153,14 +1158,16 @@ __global__ void __launch_bounds__(256) * @param[in] comp_out Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size + * @param[in] max_comp_blk_size Max size of any block after compression */ // blockDim {1024,1,1} __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(device_2dspan strm_desc, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, uint8_t* compressed_bfr, - uint32_t comp_blk_size) + uint32_t comp_blk_size, + uint32_t max_comp_blk_size) { __shared__ __align__(16) StripeStream ss; __shared__ const uint8_t* volatile comp_src_g; @@ -1271,20 +1278,83 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t num_compressed_blocks, CompressionKind compression, uint32_t comp_blk_size, + uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - gpu_inflate_input_s* comp_in, - gpu_inflate_status_s* comp_out, + device_span comp_in, + device_span comp_out, rmm::cuda_stream_view stream) { dim3 dim_block_init(256, 1); dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); gpuInitCompressionBlocks<<>>( - strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size); - if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); } + strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); + if (compression == SNAPPY) { + auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP"); + bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0; + if (use_nvcomp) { + try { + size_t temp_size; + nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize( + num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Error in getting snappy compression scratch size"); + + rmm::device_buffer scratch(temp_size, stream); + rmm::device_uvector uncompressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector uncompressed_data_sizes(num_compressed_blocks, stream); + rmm::device_uvector compressed_data_ptrs(num_compressed_blocks, stream); + rmm::device_uvector compressed_bytes_written(num_compressed_blocks, stream); + + auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), + uncompressed_data_sizes.begin(), + compressed_data_ptrs.begin()); + thrust::transform(rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(gpu_inflate_input_s in) { + return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); + }); + nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), + uncompressed_data_sizes.data(), + max_comp_blk_size, + num_compressed_blocks, + scratch.data(), + scratch.size(), + compressed_data_ptrs.data(), + compressed_bytes_written.data(), + nvcompBatchedSnappyDefaultOpts, + stream.value()); + + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression"); + + thrust::transform(rmm::exec_policy(stream), + compressed_bytes_written.begin(), + compressed_bytes_written.end(), + comp_out.begin(), + [] __device__(size_t size) { + gpu_inflate_status_s status{}; + status.bytes_written = size; + return status; + }); + } catch (...) { + // If we reach this then there was an error in compressing so set an error status for each + // block + thrust::for_each(rmm::exec_policy(stream), + comp_out.begin(), + comp_out.end(), + [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + }; + + } else { + gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream); + } + } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( - strm_desc, comp_in, comp_out, compressed_data, comp_blk_size); + strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); } } // namespace gpu diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 94d8de6561b..d6dbdbe6403 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,13 +52,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uint32_t max_uncompressed_block_size = 0; uint32_t num_compressed_blocks = 0; uint32_t num_uncompressed_blocks = 0; - while (cur + 3 < end) { + while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; uint32_t uncompressed_size; gpu_inflate_input_s* init_ctl = nullptr; block_len >>= 1; - cur += 3; + cur += BLOCK_HEADER_SIZE; if (block_len > block_size || cur + block_len > end) { // Fatal num_compressed_blocks = 0; @@ -145,12 +145,12 @@ extern "C" __global__ void __launch_bounds__(128, 8) uint32_t num_compressed_blocks = 0; uint32_t max_compressed_blocks = s->info.num_compressed_blocks; - while (cur + 3 < end) { + while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; uint32_t uncompressed_size_est, uncompressed_size_actual; block_len >>= 1; - cur += 3; + cur += BLOCK_HEADER_SIZE; if (cur + block_len > end) { break; } if (is_uncompressed) { uncompressed_size_est = block_len; @@ -367,9 +367,11 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, for (;;) { uint32_t block_len, is_uncompressed; - if (cur + 3 > end || cur + 3 >= start + compressed_offset) { break; } + if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) { + break; + } block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16); - cur += 3; + cur += BLOCK_HEADER_SIZE; is_uncompressed = block_len & 1; block_len >>= 1; cur += block_len; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index e0018ed7166..8a0112deb76 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -36,6 +36,8 @@ #include #include +#include + #include #include #include @@ -999,10 +1001,10 @@ void writer::impl::write_index_stream(int32_t stripe_id, record.pos += stream.lengths[type]; while ((record.pos >= 0) && (record.blk_pos >= 0) && (static_cast(record.pos) >= compression_blocksize_) && - (record.comp_pos + 3 + comp_out[record.blk_pos].bytes_written < + (record.comp_pos + BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written < static_cast(record.comp_size))) { record.pos -= compression_blocksize_; - record.comp_pos += 3 + comp_out[record.blk_pos].bytes_written; + record.comp_pos += BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written; record.blk_pos += 1; } } @@ -1472,29 +1474,31 @@ void writer::impl::write(table_view const& table) } // Allocate intermediate output stream buffer - size_t compressed_bfr_size = 0; - size_t num_compressed_blocks = 0; - auto stream_output = [&]() { + size_t compressed_bfr_size = 0; + size_t num_compressed_blocks = 0; + size_t max_compressed_block_size = 0; + if (compression_kind_ != NONE) { + nvcompBatchedSnappyCompressGetMaxOutputChunkSize( + compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size); + } + auto stream_output = [&]() { size_t max_stream_size = 0; bool all_device_write = true; - for (size_t stripe_id = 0; stripe_id < segmentation.num_stripes(); stripe_id++) { - for (size_t i = 0; i < num_data_streams; i++) { // TODO range for (at least) - gpu::StripeStream* ss = &strm_descs[stripe_id][i]; - if (!out_sink_->is_device_write_preferred(ss->stream_size)) { all_device_write = false; } - size_t stream_size = ss->stream_size; - if (compression_kind_ != NONE) { - ss->first_block = num_compressed_blocks; - ss->bfr_offset = compressed_bfr_size; - - auto num_blocks = std::max( - (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); - stream_size += num_blocks * 3; - num_compressed_blocks += num_blocks; - compressed_bfr_size += stream_size; - } - max_stream_size = std::max(max_stream_size, stream_size); + for (auto& ss : strm_descs.host_view().flat_view()) { + if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; } + size_t stream_size = ss.stream_size; + if (compression_kind_ != NONE) { + ss.first_block = num_compressed_blocks; + ss.bfr_offset = compressed_bfr_size; + + auto num_blocks = std::max( + (stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1); + stream_size += num_blocks * BLOCK_HEADER_SIZE; + num_compressed_blocks += num_blocks; + compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks; } + max_stream_size = std::max(max_stream_size, stream_size); } if (all_device_write) { @@ -1519,10 +1523,11 @@ void writer::impl::write(table_view const& table) num_compressed_blocks, compression_kind_, compression_blocksize_, + max_compressed_block_size, strm_descs, enc_data.streams, - comp_in.device_ptr(), - comp_out.device_ptr(), + comp_in, + comp_out, stream); strm_descs.device_to_host(stream); comp_out.device_to_host(stream, true);