Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial cuIO GPU decompression refactor #10699

Merged
merged 35 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a87293c
first pq impl; failing test
vuule Apr 13, 2022
acdbe05
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 14, 2022
8e70573
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 15, 2022
7c903f8
revert test changes
vuule Apr 15, 2022
9c6b569
refactor to avoid code repetition
vuule Apr 15, 2022
ff0f5d8
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 18, 2022
79faabf
expand zstd pq tests
vuule Apr 19, 2022
1fccc62
constness improvement
vuule Apr 21, 2022
b072a63
remove zstd
vuule Apr 21, 2022
f6872d7
rename
vuule Apr 21, 2022
f519fd2
revert zstd test changes
vuule Apr 21, 2022
1140bb5
device_span for device_decompress_input source
vuule Apr 21, 2022
a2d585f
style
vuule Apr 21, 2022
7ff740b
use device_span for dst as well; fix comp tests
vuule Apr 21, 2022
d751123
meh
vuule Apr 21, 2022
73f6482
auto*
vuule Apr 21, 2022
31fcf30
remove commented debug code
vuule Apr 21, 2022
e94688b
style
vuule Apr 21, 2022
ed6b997
Merge branch 'fea-nvcomp-zstd' of http://github.com/vuule/cudf into f…
vuule Apr 25, 2022
87fe4b8
addressing code review
vuule Apr 25, 2022
65a648c
remove unrequired members of page_enc_state_s
vuule Apr 25, 2022
6c3b832
pointless commit to try and fix github diff
vuule Apr 25, 2022
4d3b524
rework compression args again
vuule Apr 25, 2022
8da6617
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 25, 2022
2fc6dce
use hostdevice_vector in comp tests
vuule Apr 25, 2022
7e4818d
style
vuule Apr 25, 2022
0bbb3bb
kernel -> transform
vuule Apr 25, 2022
60101f1
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 27, 2022
030dd18
address code reviews
vuule Apr 27, 2022
c49994b
expand host_to_device/device_to_host to allow subspan copies
vuule Apr 27, 2022
b17322d
bug fix
vuule Apr 27, 2022
268438e
Revert "expand host_to_device/device_to_host to allow subspan copies"
vuule Apr 27, 2022
2bd4f4e
decompress_page_data clean up
vuule Apr 27, 2022
f472225
lifetime bug fix
vuule Apr 27, 2022
f0f389e
Merge branch 'branch-22.06' of https://github.com/rapidsai/cudf into …
vuule Apr 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ add_library(
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
src/io/comp/nvcomp_adapter.cpp
src/io/comp/nvcomp_adapter.cu
src/io/comp/snap.cu
src/io/comp/uncomp.cpp
src/io/comp/unsnap.cu
Expand Down
82 changes: 43 additions & 39 deletions cpp/src/io/avro/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -162,62 +162,66 @@ rmm::device_buffer decompress_data(datasource& source,
rmm::cuda_stream_view stream)
{
if (meta.codec == "deflate") {
size_t uncompressed_data_size = 0;
auto inflate_in = hostdevice_vector<device_span<uint8_t const>>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<device_span<uint8_t>>(meta.block_list.size(), stream);
auto inflate_stats = hostdevice_vector<decompress_status>(meta.block_list.size(), stream);

auto inflate_in = hostdevice_vector<gpu_inflate_input_s>(meta.block_list.size(), stream);
auto inflate_out = hostdevice_vector<gpu_inflate_status_s>(meta.block_list.size(), stream);
// Guess an initial maximum uncompressed block size. We estimate the compression factor is two
// and round up to the next multiple of 4096 bytes.
uint32_t const initial_blk_len = meta.max_block_size * 2 + (meta.max_block_size * 2) % 4096;
size_t const uncomp_size = initial_blk_len * meta.block_list.size();

// Guess an initial maximum uncompressed block size
uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff;
uncompressed_data_size = initial_blk_len * meta.block_list.size();
for (size_t i = 0; i < inflate_in.size(); ++i) {
inflate_in[i].dstSize = initial_blk_len;
}

rmm::device_buffer decomp_block_data(uncompressed_data_size, stream);
rmm::device_buffer decomp_block_data(uncomp_size, stream);

auto const base_offset = meta.block_list[0].offset;
for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) {
auto const src_pos = meta.block_list[i].offset - base_offset;

inflate_in[i].srcDevice = static_cast<uint8_t const*>(comp_block_data.data()) + src_pos;
inflate_in[i].srcSize = meta.block_list[i].size;
inflate_in[i].dstDevice = static_cast<uint8_t*>(decomp_block_data.data()) + dst_pos;
inflate_in[i] = {static_cast<uint8_t const*>(comp_block_data.data()) + src_pos,
meta.block_list[i].size};
inflate_out[i] = {static_cast<uint8_t*>(decomp_block_data.data()) + dst_pos, initial_blk_len};

// Update blocks offsets & sizes to refer to uncompressed data
meta.block_list[i].offset = dst_pos;
meta.block_list[i].size = static_cast<uint32_t>(inflate_in[i].dstSize);
meta.block_list[i].size = static_cast<uint32_t>(inflate_out[i].size());
dst_pos += meta.block_list[i].size;
}
inflate_in.host_to_device(stream);

for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) {
inflate_in.host_to_device(stream);
CUDF_CUDA_TRY(
cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value()));
CUDF_CUDA_TRY(gpuinflate(
inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream));
inflate_out.device_to_host(stream, true);
inflate_out.host_to_device(stream);
CUDF_CUDA_TRY(cudaMemsetAsync(
inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value()));
gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream);
inflate_stats.device_to_host(stream, true);

// Check if larger output is required, as it's not known ahead of time
if (loop_cnt == 0) {
size_t actual_uncompressed_size = 0;
for (size_t i = 0; i < meta.block_list.size(); i++) {
// If error status is 1 (buffer too small), the `bytes_written` field
// is actually contains the uncompressed data size
if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) {
inflate_in[i].dstSize = inflate_out[i].bytes_written;
}
actual_uncompressed_size += inflate_in[i].dstSize;
}
if (actual_uncompressed_size > uncompressed_data_size) {
decomp_block_data.resize(actual_uncompressed_size, stream);
for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) {
auto dst_base = static_cast<uint8_t*>(decomp_block_data.data());
inflate_in[i].dstDevice = dst_base + dst_pos;

meta.block_list[i].offset = dst_pos;
meta.block_list[i].size = static_cast<uint32_t>(inflate_in[i].dstSize);
dst_pos += meta.block_list[i].size;
std::vector<size_t> actual_uncomp_sizes;
actual_uncomp_sizes.reserve(inflate_out.size());
std::transform(inflate_out.begin(),
inflate_out.end(),
inflate_stats.begin(),
std::back_inserter(actual_uncomp_sizes),
[](auto const& inf_out, auto const& inf_stats) {
// If error status is 1 (buffer too small), the `bytes_written` field
// actually contains the uncompressed data size
return inf_stats.status == 1
? std::max(inf_out.size(), inf_stats.bytes_written)
: inf_out.size();
});
auto const total_actual_uncomp_size =
std::accumulate(actual_uncomp_sizes.cbegin(), actual_uncomp_sizes.cend(), 0ul);
vuule marked this conversation as resolved.
Show resolved Hide resolved
if (total_actual_uncomp_size > uncomp_size) {
decomp_block_data.resize(total_actual_uncomp_size, stream);
for (size_t i = 0; i < meta.block_list.size(); ++i) {
meta.block_list[i].offset =
i > 0 ? (meta.block_list[i - 1].size + meta.block_list[i - 1].offset) : 0;
meta.block_list[i].size = static_cast<uint32_t>(actual_uncomp_sizes[i]);

inflate_out[i] = {
static_cast<uint8_t*>(decomp_block_data.data()) + meta.block_list[i].offset,
meta.block_list[i].size};
}
vuule marked this conversation as resolved.
Show resolved Hide resolved
} else {
break;
Expand Down
69 changes: 35 additions & 34 deletions cpp/src/io/comp/debrotli.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1904,41 +1904,43 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction
*
* blockDim = {block_size,1,1}
*
* @param[in] inputs Source/Destination buffer information per block
* @param[out] outputs Decompressor status per block
* @param[in] inputs Source buffer information per block
* @param[in] outputs Destination buffer information per block
vuule marked this conversation as resolved.
Show resolved Hide resolved
* @param[out] statuses Decompressor status per block
* @param scratch Intermediate device memory heap space (will be dynamically shared between blocks)
* @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between
*blocks)
vuule marked this conversation as resolved.
Show resolved Hide resolved
* @param count Number of blocks to decompress
vuule marked this conversation as resolved.
Show resolved Hide resolved
*/
extern "C" __global__ void __launch_bounds__(block_size, 2)
gpu_debrotli_kernel(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
__global__ void __launch_bounds__(block_size, 2)
gpu_debrotli_kernel(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
uint8_t* scratch,
uint32_t scratch_size,
uint32_t count)
uint32_t scratch_size)
{
__shared__ __align__(16) debrotli_state_s state_g;

int t = threadIdx.x;
int z = blockIdx.x;
debrotli_state_s* const s = &state_g;

if (z >= count) { return; }
if (z >= inputs.size()) { return; }
// Thread0: initializes shared state and decode stream header
if (!t) {
auto const* src = static_cast<uint8_t const*>(inputs[z].srcDevice);
size_t src_size = inputs[z].srcSize;
auto const src = inputs[z].data();
size_t const src_size = inputs[z].size();
if (src && src_size >= 8) {
s->error = 0;
s->out = s->outbase = static_cast<uint8_t*>(inputs[z].dstDevice);
s->bytes_left = inputs[z].dstSize;
s->mtf_upper_bound = 63;
s->dist_rb[0] = 16;
s->dist_rb[1] = 15;
s->dist_rb[2] = 11;
s->dist_rb[3] = 4;
s->dist_rb_idx = 0;
s->error = 0;
s->out = outputs[z].data();
s->outbase = outputs[z].data();
s->bytes_left = outputs[z].size();
s->mtf_upper_bound = 63;
s->dist_rb[0] = 16;
s->dist_rb[1] = 15;
s->dist_rb[2] = 11;
s->dist_rb[3] = 4;
s->dist_rb_idx = 0;
s->p1 = s->p2 = 0;
initbits(s, src, src_size);
DecodeStreamHeader(s);
Expand Down Expand Up @@ -2015,9 +2017,9 @@ extern "C" __global__ void __launch_bounds__(block_size, 2)
__syncthreads();
// Output decompression status
if (!t) {
outputs[z].bytes_written = s->out - s->outbase;
outputs[z].status = s->error;
outputs[z].reserved = s->fb_size; // Return ext heap used by last block (statistics)
statuses[z].bytes_written = s->out - s->outbase;
vuule marked this conversation as resolved.
Show resolved Hide resolved
statuses[z].status = s->error;
statuses[z].reserved = s->fb_size; // Return ext heap used by last block (statistics)
}
}

Expand Down Expand Up @@ -2075,20 +2077,21 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)
#include <stdio.h>
#endif

cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
gpu_inflate_status_s* outputs,
void* scratch,
size_t scratch_size,
int count,
rmm::cuda_stream_view stream)
void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<decompress_status> statuses,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream)
{
uint32_t count32 = (count > 0) ? count : 0;
auto const count = inputs.size();
uint32_t fb_heap_size;
auto* scratch_u8 = static_cast<uint8_t*>(scratch);
dim3 dim_block(block_size, 1);
dim3 dim_grid(count32, 1); // TODO: Check max grid dimensions vs max expected count
dim3 dim_grid(count, 1); // TODO: Check max grid dimensions vs max expected count

if (scratch_size < sizeof(brotli_dictionary_s)) { return cudaErrorLaunchOutOfResources; }
CUDF_EXPECTS(scratch_size >= sizeof(brotli_dictionary_s),
"Insufficient scratch space for debrotli");
scratch_size = min(scratch_size, (size_t)0xffffffffu);
fb_heap_size = (uint32_t)((scratch_size - sizeof(brotli_dictionary_s)) & ~0xf);

Expand All @@ -2101,7 +2104,7 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
cudaMemcpyHostToDevice,
stream.value()));
gpu_debrotli_kernel<<<dim_grid, dim_block, 0, stream.value()>>>(
inputs, outputs, scratch_u8, fb_heap_size, count32);
inputs, outputs, statuses, scratch_u8, fb_heap_size);
#if DUMP_FB_HEAP
uint32_t dump[2];
uint32_t cur = 0;
Expand All @@ -2114,8 +2117,6 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs,
cur = (dump[0] > cur) ? dump[0] : 0xffffffffu;
}
#endif

return cudaSuccess;
}

} // namespace io
Expand Down
Loading