Skip to content

Commit

Permalink
Partial cuIO GPU decompression refactor (#10699)
Browse files Browse the repository at this point in the history
Required to expand future nvcomp integration.

- [x] Moving nvcomp integration in ORC and Parquet readers to common code. Enables nvcomp use for multiple compression type without code duplication.
- [x] `gpu_inflate_input_s` refactor to facilitate unified host/device decompressor interface. Enables further changes to unify CPU and GPU decompression API, which in turn enables ZSTD use in ORC.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Robert Maynard (https://github.com/robertmaynard)
  - Elias Stehle (https://github.com/elstehle)
  - Bradley Dice (https://github.com/bdice)

URL: #10699
  • Loading branch information
vuule authored Apr 28, 2022
1 parent 9ac2477 commit 280acdf
Show file tree
Hide file tree
Showing 23 changed files with 834 additions and 711 deletions.
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ add_library(
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
src/io/comp/nvcomp_adapter.cpp
src/io/comp/nvcomp_adapter.cu
src/io/comp/snap.cu
src/io/comp/uncomp.cpp
src/io/comp/unsnap.cu
Expand Down
82 changes: 43 additions & 39 deletions cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -162,62 +162,66 @@ rmm::device_buffer decompress_data(datasource& source,
rmm::cuda_stream_view stream)
{
if (meta.codec == "deflate") {
size_t uncompressed_data_size = 0;
auto inflate_in = hostdevice_vector<device_span<uint8_t const>>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<device_span<uint8_t>>(meta.block_list.size(), stream);
auto inflate_stats = hostdevice_vector<decompress_status>(meta.block_list.size(), stream);

auto inflate_in = hostdevice_vector<gpu_inflate_input_s>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<gpu_inflate_status_s>(meta.block_list.size(), stream);
// Guess an initial maximum uncompressed block size. We estimate the compression factor is two
// and round up to the next multiple of 4096 bytes.
uint32_t const initial_blk_len = meta.max_block_size * 2 + (meta.max_block_size * 2) % 4096;
size_t const uncomp_size = initial_blk_len * meta.block_list.size();

// Guess an initial maximum uncompressed block size
uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff;
uncompressed_data_size = initial_blk_len * meta.block_list.size();
for (size_t i = 0; i < inflate_in.size(); ++i) {
inflate_in[i].dstSize = initial_blk_len;
}

rmm::device_buffer decomp_block_data(uncompressed_data_size, stream);
rmm::device_buffer decomp_block_data(uncomp_size, stream);

auto const base_offset = meta.block_list[0].offset;
for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) {
auto const src_pos = meta.block_list[i].offset - base_offset;

inflate_in[i].srcDevice = static_cast<uint8_t const*>(comp_block_data.data()) + src_pos;
inflate_in[i].srcSize = meta.block_list[i].size;
inflate_in[i].dstDevice = static_cast<uint8_t*>(decomp_block_data.data()) + dst_pos;
inflate_in[i] = {static_cast<uint8_t const*>(comp_block_data.data()) + src_pos,
meta.block_list[i].size};
inflate_out[i] = {static_cast<uint8_t*>(decomp_block_data.data()) + dst_pos, initial_blk_len};

// Update blocks offsets & sizes to refer to uncompressed data
meta.block_list[i].offset = dst_pos;
meta.block_list[i].size = static_cast<uint32_t>(inflate_in[i].dstSize);
meta.block_list[i].size = static_cast<uint32_t>(inflate_out[i].size());
dst_pos += meta.block_list[i].size;
}
inflate_in.host_to_device(stream);

for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) {
inflate_in.host_to_device(stream);
CUDF_CUDA_TRY(
cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value()));
CUDF_CUDA_TRY(gpuinflate(
inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream));
inflate_out.device_to_host(stream, true);
inflate_out.host_to_device(stream);
CUDF_CUDA_TRY(cudaMemsetAsync(
inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value()));
gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream);
inflate_stats.device_to_host(stream, true);

// Check if larger output is required, as it's not known ahead of time
if (loop_cnt == 0) {
size_t actual_uncompressed_size = 0;
for (size_t i = 0; i < meta.block_list.size(); i++) {
// If error status is 1 (buffer too small), the `bytes_written` field
// is actually contains the uncompressed data size
if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) {
inflate_in[i].dstSize = inflate_out[i].bytes_written;
}
actual_uncompressed_size += inflate_in[i].dstSize;
}
if (actual_uncompressed_size > uncompressed_data_size) {
decomp_block_data.resize(actual_uncompressed_size, stream);
for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) {
auto dst_base = static_cast<uint8_t*>(decomp_block_data.data());
inflate_in[i].dstDevice = dst_base + dst_pos;

meta.block_list[i].offset = dst_pos;
meta.block_list[i].size = static_cast<uint32_t>(inflate_in[i].dstSize);
dst_pos += meta.block_list[i].size;
std::vector<size_t> actual_uncomp_sizes;
actual_uncomp_sizes.reserve(inflate_out.size());
std::transform(inflate_out.begin(),
inflate_out.end(),
inflate_stats.begin(),
std::back_inserter(actual_uncomp_sizes),
[](auto const& inf_out, auto const& inf_stats) {
// If error status is 1 (buffer too small), the `bytes_written` field
// actually contains the uncompressed data size
return inf_stats.status == 1
? std::max(inf_out.size(), inf_stats.bytes_written)
: inf_out.size();
});
auto const total_actual_uncomp_size =
std::accumulate(actual_uncomp_sizes.cbegin(), actual_uncomp_sizes.cend(), 0ul);
if (total_actual_uncomp_size > uncomp_size) {
decomp_block_data.resize(total_actual_uncomp_size, stream);
for (size_t i = 0; i < meta.block_list.size(); ++i) {
meta.block_list[i].offset =
i > 0 ? (meta.block_list[i - 1].size + meta.block_list[i - 1].offset) : 0;
meta.block_list[i].size = static_cast<uint32_t>(actual_uncomp_sizes[i]);

inflate_out[i] = {
static_cast<uint8_t*>(decomp_block_data.data()) + meta.block_list[i].offset,
meta.block_list[i].size};
}
} else {
break;
Expand Down
75 changes: 38 additions & 37 deletions cpp/src/io/comp/debrotli.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1904,41 +1904,42 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction
*
* blockDim = {block_size,1,1}
*
* @param[in] inputs Source/Destination buffer information per block
* @param[out] outputs Decompressor status per block
* @param[in] inputs Source buffer per block
* @param[out] outputs Destination buffer per block
* @param[out] statuses Decompressor status per block
* @param scratch Intermediate device memory heap space (will be dynamically shared between blocks)
* @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between
*blocks)
* @param count Number of blocks to decompress
* blocks)
*/
extern "C" __global__ void __launch_bounds__(block_size, 2)
gpu_debrotli_kernel(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
__global__ void __launch_bounds__(block_size, 2)
gpu_debrotli_kernel(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
uint8_t* scratch,
uint32_t scratch_size,
uint32_t count)
uint32_t scratch_size)
{
__shared__ __align__(16) debrotli_state_s state_g;

int t = threadIdx.x;
int z = blockIdx.x;
auto const block_id = blockIdx.x;
debrotli_state_s* const s = &state_g;

if (z >= count) { return; }
if (block_id >= inputs.size()) { return; }
// Thread0: initializes shared state and decode stream header
if (!t) {
auto const* src = static_cast<uint8_t const*>(inputs[z].srcDevice);
size_t src_size = inputs[z].srcSize;
auto const src = inputs[block_id].data();
auto const src_size = inputs[block_id].size();
if (src && src_size >= 8) {
s->error = 0;
s->out = s->outbase = static_cast<uint8_t*>(inputs[z].dstDevice);
s->bytes_left = inputs[z].dstSize;
s->mtf_upper_bound = 63;
s->dist_rb[0] = 16;
s->dist_rb[1] = 15;
s->dist_rb[2] = 11;
s->dist_rb[3] = 4;
s->dist_rb_idx = 0;
s->error = 0;
s->out = outputs[block_id].data();
s->outbase = s->out;
s->bytes_left = outputs[block_id].size();
s->mtf_upper_bound = 63;
s->dist_rb[0] = 16;
s->dist_rb[1] = 15;
s->dist_rb[2] = 11;
s->dist_rb[3] = 4;
s->dist_rb_idx = 0;
s->p1 = s->p2 = 0;
initbits(s, src, src_size);
DecodeStreamHeader(s);
Expand Down Expand Up @@ -2015,9 +2016,10 @@ extern "C" __global__ void __launch_bounds__(block_size, 2)
__syncthreads();
// Output decompression status
if (!t) {
outputs[z].bytes_written = s->out - s->outbase;
outputs[z].status = s->error;
outputs[z].reserved = s->fb_size; // Return ext heap used by last block (statistics)
statuses[block_id].bytes_written = s->out - s->outbase;
statuses[block_id].status = s->error;
// Return ext heap used by last block (statistics)
statuses[block_id].reserved = s->fb_size;
}
}

Expand Down Expand Up @@ -2075,20 +2077,21 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)
#include <stdio.h>
#endif

cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
void* scratch,
size_t scratch_size,
int count,
rmm::cuda_stream_view stream)
void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream)
{
uint32_t count32 = (count > 0) ? count : 0;
auto const count = inputs.size();
uint32_t fb_heap_size;
auto* scratch_u8 = static_cast<uint8_t*>(scratch);
dim3 dim_block(block_size, 1);
dim3 dim_grid(count32, 1); // TODO: Check max grid dimensions vs max expected count
dim3 dim_grid(count, 1); // TODO: Check max grid dimensions vs max expected count

if (scratch_size < sizeof(brotli_dictionary_s)) { return cudaErrorLaunchOutOfResources; }
CUDF_EXPECTS(scratch_size >= sizeof(brotli_dictionary_s),
"Insufficient scratch space for debrotli");
scratch_size = min(scratch_size, (size_t)0xffffffffu);
fb_heap_size = (uint32_t)((scratch_size - sizeof(brotli_dictionary_s)) & ~0xf);

Expand All @@ -2101,7 +2104,7 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
cudaMemcpyHostToDevice,
stream.value()));
gpu_debrotli_kernel<<<dim_grid, dim_block, 0, stream.value()>>>(
inputs, outputs, scratch_u8, fb_heap_size, count32);
inputs, outputs, statuses, scratch_u8, fb_heap_size);
#if DUMP_FB_HEAP
uint32_t dump[2];
uint32_t cur = 0;
Expand All @@ -2114,8 +2117,6 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
cur = (dump[0] > cur) ? dump[0] : 0xffffffffu;
}
#endif

return cudaSuccess;
}

} // namespace io
Expand Down
Loading

0 comments on commit 280acdf

Please sign in to comment.