diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 91f67fd0420..15caaec9bec 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -302,6 +302,8 @@ add_library( src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu + src/io/comp/nvcomp_adapter.cpp + src/io/comp/nvcomp_adapter.cu src/io/comp/snap.cu src/io/comp/uncomp.cpp src/io/comp/unsnap.cu diff --git a/cpp/src/io/avro/reader_impl.cu b/cpp/src/io/avro/reader_impl.cu index 5885b61b35b..556ca6b9d80 100644 --- a/cpp/src/io/avro/reader_impl.cu +++ b/cpp/src/io/avro/reader_impl.cu @@ -162,62 +162,66 @@ rmm::device_buffer decompress_data(datasource& source, rmm::cuda_stream_view stream) { if (meta.codec == "deflate") { - size_t uncompressed_data_size = 0; + auto inflate_in = hostdevice_vector>(meta.block_list.size(), stream); + auto inflate_out = hostdevice_vector>(meta.block_list.size(), stream); + auto inflate_stats = hostdevice_vector(meta.block_list.size(), stream); - auto inflate_in = hostdevice_vector(meta.block_list.size(), stream); - auto inflate_out = hostdevice_vector(meta.block_list.size(), stream); + // Guess an initial maximum uncompressed block size. We estimate the compression factor is two + // and round up to the next multiple of 4096 bytes. + uint32_t const initial_blk_len = meta.max_block_size * 2 + (meta.max_block_size * 2) % 4096; + size_t const uncomp_size = initial_blk_len * meta.block_list.size(); - // Guess an initial maximum uncompressed block size - uint32_t initial_blk_len = (meta.max_block_size * 2 + 0xfff) & ~0xfff; - uncompressed_data_size = initial_blk_len * meta.block_list.size(); - for (size_t i = 0; i < inflate_in.size(); ++i) { - inflate_in[i].dstSize = initial_blk_len; - } - - rmm::device_buffer decomp_block_data(uncompressed_data_size, stream); + rmm::device_buffer decomp_block_data(uncomp_size, stream); auto const base_offset = meta.block_list[0].offset; for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { auto const src_pos = meta.block_list[i].offset - base_offset; - inflate_in[i].srcDevice = static_cast(comp_block_data.data()) + src_pos; - inflate_in[i].srcSize = meta.block_list[i].size; - inflate_in[i].dstDevice = static_cast(decomp_block_data.data()) + dst_pos; + inflate_in[i] = {static_cast(comp_block_data.data()) + src_pos, + meta.block_list[i].size}; + inflate_out[i] = {static_cast(decomp_block_data.data()) + dst_pos, initial_blk_len}; // Update blocks offsets & sizes to refer to uncompressed data meta.block_list[i].offset = dst_pos; - meta.block_list[i].size = static_cast(inflate_in[i].dstSize); + meta.block_list[i].size = static_cast(inflate_out[i].size()); dst_pos += meta.block_list[i].size; } + inflate_in.host_to_device(stream); for (int loop_cnt = 0; loop_cnt < 2; loop_cnt++) { - inflate_in.host_to_device(stream); - CUDF_CUDA_TRY( - cudaMemsetAsync(inflate_out.device_ptr(), 0, inflate_out.memory_size(), stream.value())); - CUDF_CUDA_TRY(gpuinflate( - inflate_in.device_ptr(), inflate_out.device_ptr(), inflate_in.size(), 0, stream)); - inflate_out.device_to_host(stream, true); + inflate_out.host_to_device(stream); + CUDF_CUDA_TRY(cudaMemsetAsync( + inflate_stats.device_ptr(), 0, inflate_stats.memory_size(), stream.value())); + gpuinflate(inflate_in, inflate_out, inflate_stats, gzip_header_included::NO, stream); + inflate_stats.device_to_host(stream, true); // Check if larger output is required, as it's not known ahead of time if (loop_cnt == 0) { - size_t actual_uncompressed_size = 0; - for (size_t i = 0; i < meta.block_list.size(); i++) { - // If error status is 1 (buffer too small), the `bytes_written` field - // is actually contains the uncompressed data size - if (inflate_out[i].status == 1 && inflate_out[i].bytes_written > inflate_in[i].dstSize) { - inflate_in[i].dstSize = inflate_out[i].bytes_written; - } - actual_uncompressed_size += inflate_in[i].dstSize; - } - if (actual_uncompressed_size > uncompressed_data_size) { - decomp_block_data.resize(actual_uncompressed_size, stream); - for (size_t i = 0, dst_pos = 0; i < meta.block_list.size(); i++) { - auto dst_base = static_cast(decomp_block_data.data()); - inflate_in[i].dstDevice = dst_base + dst_pos; - - meta.block_list[i].offset = dst_pos; - meta.block_list[i].size = static_cast(inflate_in[i].dstSize); - dst_pos += meta.block_list[i].size; + std::vector actual_uncomp_sizes; + actual_uncomp_sizes.reserve(inflate_out.size()); + std::transform(inflate_out.begin(), + inflate_out.end(), + inflate_stats.begin(), + std::back_inserter(actual_uncomp_sizes), + [](auto const& inf_out, auto const& inf_stats) { + // If error status is 1 (buffer too small), the `bytes_written` field + // actually contains the uncompressed data size + return inf_stats.status == 1 + ? std::max(inf_out.size(), inf_stats.bytes_written) + : inf_out.size(); + }); + auto const total_actual_uncomp_size = + std::accumulate(actual_uncomp_sizes.cbegin(), actual_uncomp_sizes.cend(), 0ul); + if (total_actual_uncomp_size > uncomp_size) { + decomp_block_data.resize(total_actual_uncomp_size, stream); + for (size_t i = 0; i < meta.block_list.size(); ++i) { + meta.block_list[i].offset = + i > 0 ? (meta.block_list[i - 1].size + meta.block_list[i - 1].offset) : 0; + meta.block_list[i].size = static_cast(actual_uncomp_sizes[i]); + + inflate_out[i] = { + static_cast(decomp_block_data.data()) + meta.block_list[i].offset, + meta.block_list[i].size}; } } else { break; diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 631cf19b2aa..cf4d1b0e0f4 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -1904,41 +1904,42 @@ static __device__ void ProcessCommands(debrotli_state_s* s, const brotli_diction * * blockDim = {block_size,1,1} * - * @param[in] inputs Source/Destination buffer information per block - * @param[out] outputs Decompressor status per block + * @param[in] inputs Source buffer per block + * @param[out] outputs Destination buffer per block + * @param[out] statuses Decompressor status per block * @param scratch Intermediate device memory heap space (will be dynamically shared between blocks) * @param scratch_size Size of scratch heap space (smaller sizes may result in serialization between - *blocks) - * @param count Number of blocks to decompress + * blocks) */ -extern "C" __global__ void __launch_bounds__(block_size, 2) - gpu_debrotli_kernel(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, +__global__ void __launch_bounds__(block_size, 2) + gpu_debrotli_kernel(device_span const> inputs, + device_span const> outputs, + device_span statuses, uint8_t* scratch, - uint32_t scratch_size, - uint32_t count) + uint32_t scratch_size) { __shared__ __align__(16) debrotli_state_s state_g; int t = threadIdx.x; - int z = blockIdx.x; + auto const block_id = blockIdx.x; debrotli_state_s* const s = &state_g; - if (z >= count) { return; } + if (block_id >= inputs.size()) { return; } // Thread0: initializes shared state and decode stream header if (!t) { - auto const* src = static_cast(inputs[z].srcDevice); - size_t src_size = inputs[z].srcSize; + auto const src = inputs[block_id].data(); + auto const src_size = inputs[block_id].size(); if (src && src_size >= 8) { - s->error = 0; - s->out = s->outbase = static_cast(inputs[z].dstDevice); - s->bytes_left = inputs[z].dstSize; - s->mtf_upper_bound = 63; - s->dist_rb[0] = 16; - s->dist_rb[1] = 15; - s->dist_rb[2] = 11; - s->dist_rb[3] = 4; - s->dist_rb_idx = 0; + s->error = 0; + s->out = outputs[block_id].data(); + s->outbase = s->out; + s->bytes_left = outputs[block_id].size(); + s->mtf_upper_bound = 63; + s->dist_rb[0] = 16; + s->dist_rb[1] = 15; + s->dist_rb[2] = 11; + s->dist_rb[3] = 4; + s->dist_rb_idx = 0; s->p1 = s->p2 = 0; initbits(s, src, src_size); DecodeStreamHeader(s); @@ -2015,9 +2016,10 @@ extern "C" __global__ void __launch_bounds__(block_size, 2) __syncthreads(); // Output decompression status if (!t) { - outputs[z].bytes_written = s->out - s->outbase; - outputs[z].status = s->error; - outputs[z].reserved = s->fb_size; // Return ext heap used by last block (statistics) + statuses[block_id].bytes_written = s->out - s->outbase; + statuses[block_id].status = s->error; + // Return ext heap used by last block (statistics) + statuses[block_id].reserved = s->fb_size; } } @@ -2075,20 +2077,21 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) #include #endif -cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - void* scratch, - size_t scratch_size, - int count, - rmm::cuda_stream_view stream) +void gpu_debrotli(device_span const> inputs, + device_span const> outputs, + device_span statuses, + void* scratch, + size_t scratch_size, + rmm::cuda_stream_view stream) { - uint32_t count32 = (count > 0) ? count : 0; + auto const count = inputs.size(); uint32_t fb_heap_size; auto* scratch_u8 = static_cast(scratch); dim3 dim_block(block_size, 1); - dim3 dim_grid(count32, 1); // TODO: Check max grid dimensions vs max expected count + dim3 dim_grid(count, 1); // TODO: Check max grid dimensions vs max expected count - if (scratch_size < sizeof(brotli_dictionary_s)) { return cudaErrorLaunchOutOfResources; } + CUDF_EXPECTS(scratch_size >= sizeof(brotli_dictionary_s), + "Insufficient scratch space for debrotli"); scratch_size = min(scratch_size, (size_t)0xffffffffu); fb_heap_size = (uint32_t)((scratch_size - sizeof(brotli_dictionary_s)) & ~0xf); @@ -2101,7 +2104,7 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs, cudaMemcpyHostToDevice, stream.value())); gpu_debrotli_kernel<<>>( - inputs, outputs, scratch_u8, fb_heap_size, count32); + inputs, outputs, statuses, scratch_u8, fb_heap_size); #if DUMP_FB_HEAP uint32_t dump[2]; uint32_t cur = 0; @@ -2114,8 +2117,6 @@ cudaError_t __host__ gpu_debrotli(gpu_inflate_input_s* inputs, cur = (dump[0] > cur) ? dump[0] : 0xffffffffu; } #endif - - return cudaSuccess; } } // namespace io diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 508e960430d..0d33158da2b 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,8 +124,8 @@ struct inflate_state_s { uint8_t* outbase; ///< start of output buffer uint8_t* outend; ///< end of output buffer // Input state - uint8_t* cur; ///< input buffer - uint8_t* end; ///< end of input buffer + uint8_t const* cur; ///< input buffer + uint8_t const* end; ///< end of input buffer uint2 bitbuf; ///< bit buffer (64-bit) uint32_t bitpos; ///< position in bit buffer @@ -180,10 +180,10 @@ inline __device__ void skipbits(inflate_state_s* s, uint32_t n) { uint32_t bitpos = s->bitpos + n; if (bitpos >= 32) { - uint8_t* cur = s->cur + 8; - s->bitbuf.x = s->bitbuf.y; - s->bitbuf.y = (cur < s->end) ? *reinterpret_cast(cur) : 0; - s->cur = cur - 4; + auto cur = s->cur + 8; + s->bitbuf.x = s->bitbuf.y; + s->bitbuf.y = (cur < s->end) ? *reinterpret_cast(cur) : 0; + s->cur = cur - 4; bitpos &= 0x1f; } s->bitpos = bitpos; @@ -510,8 +510,8 @@ __device__ void decode_symbols(inflate_state_s* s) { uint32_t bitpos = s->bitpos; uint2 bitbuf = s->bitbuf; - uint8_t* cur = s->cur; - uint8_t* end = s->end; + auto cur = s->cur; + auto end = s->end; int32_t batch = 0; int32_t sym, batch_len; @@ -871,13 +871,11 @@ __device__ int init_stored(inflate_state_s* s) /// Copy bytes from stored block to destination __device__ void copy_stored(inflate_state_s* s, int t) { - int len = s->stored_blk_len; - uint8_t* cur = s->cur + (s->bitpos >> 3); - uint8_t* out = s->out; - uint8_t* outend = s->outend; - uint8_t* cur4; - int slow_bytes = min(len, (int)((16 - (size_t)out) & 0xf)); - int fast_bytes, bitpos; + auto len = s->stored_blk_len; + auto cur = s->cur + s->bitpos / 8; + auto out = s->out; + auto outend = s->outend; + auto const slow_bytes = min(len, (int)((16 - reinterpret_cast(out)) % 16)); // Slow copy until output is 16B aligned if (slow_bytes) { @@ -890,11 +888,11 @@ __device__ void copy_stored(inflate_state_s* s, int t) out += slow_bytes; len -= slow_bytes; } - fast_bytes = len; + auto fast_bytes = len; if (out < outend) { fast_bytes = (int)min((size_t)fast_bytes, (outend - out)); } fast_bytes &= ~0xf; - bitpos = ((int)(3 & (size_t)cur)) << 3; - cur4 = cur - (bitpos >> 3); + auto bitpos = ((int)((size_t)cur % 4)) * 8; + auto cur4 = cur - (bitpos / 8); if (out < outend) { // Fast copy 16 bytes at a time for (int i = t * 16; i < fast_bytes; i += blockDim.x * 16) { @@ -926,13 +924,13 @@ __device__ void copy_stored(inflate_state_s* s, int t) __syncthreads(); if (t == 0) { // Reset bitstream to end of block - uint8_t* p = cur + len; + auto p = cur + len; auto prefix_bytes = (uint32_t)(((size_t)p) & 3); p -= prefix_bytes; s->cur = p; - s->bitbuf.x = (p < s->end) ? *reinterpret_cast(p) : 0; + s->bitbuf.x = (p < s->end) ? *reinterpret_cast(p) : 0; p += 4; - s->bitbuf.y = (p < s->end) ? *reinterpret_cast(p) : 0; + s->bitbuf.y = (p < s->end) ? *reinterpret_cast(p) : 0; s->bitpos = prefix_bytes * 8; s->out = out; } @@ -1021,12 +1019,16 @@ __device__ int parse_gzip_header(const uint8_t* src, size_t src_size) * * @tparam block_size Thread block dimension for this call * @param inputs Source and destination buffer information per block - * @param outputs Decompression status buffer per block + * @param outputs Destination buffer information per block + * @param statuses Decompression status buffer per block * @param parse_hdr If nonzero, indicates that the compressed bitstream includes a GZIP header */ template __global__ void __launch_bounds__(block_size) - inflate_kernel(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, int parse_hdr) + inflate_kernel(device_span const> inputs, + device_span const> outputs, + device_span statuses, + gzip_header_included parse_hdr) { __shared__ __align__(16) inflate_state_s state_g; @@ -1035,12 +1037,11 @@ __global__ void __launch_bounds__(block_size) inflate_state_s* state = &state_g; if (!t) { - auto* p = const_cast(static_cast(inputs[z].srcDevice)); - size_t src_size = inputs[z].srcSize; - uint32_t prefix_bytes; + auto p = inputs[z].data(); + auto src_size = inputs[z].size(); // Parse header if needed state->err = 0; - if (parse_hdr) { + if (parse_hdr == gzip_header_included::YES) { int hdr_len = parse_gzip_header(p, src_size); src_size = (src_size >= 8) ? src_size - 8 : 0; // ignore footer if (hdr_len >= 0) { @@ -1051,16 +1052,16 @@ __global__ void __launch_bounds__(block_size) } } // Initialize shared state - state->out = const_cast(static_cast(inputs[z].dstDevice)); - state->outbase = state->out; - state->outend = state->out + inputs[z].dstSize; - state->end = p + src_size; - prefix_bytes = (uint32_t)(((size_t)p) & 3); + state->out = outputs[z].data(); + state->outbase = state->out; + state->outend = state->out + outputs[z].size(); + state->end = p + src_size; + auto const prefix_bytes = (uint32_t)(((size_t)p) & 3); p -= prefix_bytes; state->cur = p; - state->bitbuf.x = (p < state->end) ? *reinterpret_cast(p) : 0; + state->bitbuf.x = (p < state->end) ? *reinterpret_cast(p) : 0; p += 4; - state->bitbuf.y = (p < state->end) ? *reinterpret_cast(p) : 0; + state->bitbuf.y = (p < state->end) ? *reinterpret_cast(p) : 0; state->bitpos = prefix_bytes * 8; } __syncthreads(); @@ -1132,9 +1133,9 @@ __global__ void __launch_bounds__(block_size) // Output buffer too small state->err = 1; } - outputs[z].bytes_written = state->out - state->outbase; - outputs[z].status = state->err; - outputs[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes + statuses[z].bytes_written = state->out - state->outbase; + statuses[z].status = state->err; + statuses[z].reserved = (int)(state->end - state->cur); // Here mainly for debug purposes } } @@ -1145,7 +1146,9 @@ __global__ void __launch_bounds__(block_size) * * @param inputs Source and destination information per block */ -__global__ void __launch_bounds__(1024) copy_uncompressed_kernel(gpu_inflate_input_s* inputs) +__global__ void __launch_bounds__(1024) + copy_uncompressed_kernel(device_span const> inputs, + device_span const> outputs) { __shared__ const uint8_t* volatile src_g; __shared__ uint8_t* volatile dst_g; @@ -1158,9 +1161,9 @@ __global__ void __launch_bounds__(1024) copy_uncompressed_kernel(gpu_inflate_inp uint32_t len, src_align_bytes, src_align_bits, dst_align_bytes; if (!t) { - src = static_cast(inputs[z].srcDevice); - dst = static_cast(inputs[z].dstDevice); - len = min((uint32_t)inputs[z].srcSize, (uint32_t)inputs[z].dstSize); + src = inputs[z].data(); + dst = outputs[z].data(); + len = static_cast(min(inputs[z].size(), outputs[z].size())); src_g = src; dst_g = dst; copy_len_g = len; @@ -1195,26 +1198,26 @@ __global__ void __launch_bounds__(1024) copy_uncompressed_kernel(gpu_inflate_inp if (t < len) { dst[t] = src[t]; } } -cudaError_t __host__ gpuinflate(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - int parse_hdr, - rmm::cuda_stream_view stream) +void gpuinflate(device_span const> inputs, + device_span const> outputs, + device_span statuses, + gzip_header_included parse_hdr, + rmm::cuda_stream_view stream) { constexpr int block_size = 128; // Threads per block - if (count > 0) { + if (inputs.size() > 0) { inflate_kernel - <<>>(inputs, outputs, parse_hdr); + <<>>(inputs, outputs, statuses, parse_hdr); } - return cudaSuccess; } -cudaError_t __host__ gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs, - int count, - rmm::cuda_stream_view stream) +void gpu_copy_uncompressed_blocks(device_span const> inputs, + device_span const> outputs, + rmm::cuda_stream_view stream) { - if (count > 0) { copy_uncompressed_kernel<<>>(inputs); } - return cudaSuccess; + if (inputs.size() > 0) { + copy_uncompressed_kernel<<>>(inputs, outputs); + } } } // namespace io diff --git a/cpp/src/io/comp/gpuinflate.h b/cpp/src/io/comp/gpuinflate.h index 29856bcd3f3..3870b2ac3b3 100644 --- a/cpp/src/io/comp/gpuinflate.h +++ b/cpp/src/io/comp/gpuinflate.h @@ -16,75 +16,70 @@ #pragma once -#include +#include #include +#include + namespace cudf { namespace io { -/** - * @brief Input parameters for the decompression interface - */ -struct gpu_inflate_input_s { - const void* srcDevice; - uint64_t srcSize; - void* dstDevice; - uint64_t dstSize; -}; /** * @brief Output parameters for the decompression interface */ -struct gpu_inflate_status_s { +struct decompress_status { uint64_t bytes_written; uint32_t status; uint32_t reserved; }; +enum class gzip_header_included { NO, YES }; + /** * @brief Interface for decompressing GZIP-compressed data * * Multiple, independent chunks of compressed data can be decompressed by using - * separate gpu_inflate_input_s/gpu_inflate_status_s pairs for each chunk. + * separate input/output/status for each chunk. * - * @param[in] inputs List of input argument structures - * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers + * @param[out] statuses List of output status structures * @param[in] parse_hdr Whether or not to parse GZIP header * @param[in] stream CUDA stream to use */ -cudaError_t gpuinflate(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - int parse_hdr, - rmm::cuda_stream_view stream); +void gpuinflate(device_span const> inputs, + device_span const> outputs, + device_span statuses, + gzip_header_included parse_hdr, + rmm::cuda_stream_view stream); /** * @brief Interface for copying uncompressed byte blocks * - * @param[in] inputs List of input argument structures - * @param[in] count Number of input structures + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers * @param[in] stream CUDA stream to use */ -cudaError_t gpu_copy_uncompressed_blocks(gpu_inflate_input_s* inputs, - int count, - rmm::cuda_stream_view stream); +void gpu_copy_uncompressed_blocks(device_span const> inputs, + device_span const> outputs, + rmm::cuda_stream_view stream); /** * @brief Interface for decompressing Snappy-compressed data * * Multiple, independent chunks of compressed data can be decompressed by using - * separate gpu_inflate_input_s/gpu_inflate_status_s pairs for each chunk. + * separate input/output/status for each chunk. * - * @param[in] inputs List of input argument structures - * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers + * @param[out] statuses List of output status structures * @param[in] stream CUDA stream to use */ -cudaError_t gpu_unsnap(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - rmm::cuda_stream_view stream); +void gpu_unsnap(device_span const> inputs, + device_span const> outputs, + device_span statuses, + rmm::cuda_stream_view stream); /** * @brief Computes the size of temporary memory for Brotli decompression @@ -99,37 +94,37 @@ size_t get_gpu_debrotli_scratch_size(int max_num_inputs = 0); * @brief Interface for decompressing Brotli-compressed data * * Multiple, independent chunks of compressed data can be decompressed by using - * separate gpu_inflate_input_s/gpu_inflate_status_s pairs for each chunk. + * separate input/output/status pairs for each chunk. * - * @param[in] inputs List of input argument structures - * @param[out] outputs List of output status structures + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers + * @param[out] statuses List of output status structures * @param[in] scratch Temporary memory for intermediate work * @param[in] scratch_size Size in bytes of the temporary memory - * @param[in] count Number of input/output structures * @param[in] stream CUDA stream to use */ -cudaError_t gpu_debrotli(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - void* scratch, - size_t scratch_size, - int count, - rmm::cuda_stream_view stream); +void gpu_debrotli(device_span const> inputs, + device_span const> outputs, + device_span statuses, + void* scratch, + size_t scratch_size, + rmm::cuda_stream_view stream); /** * @brief Interface for compressing data with Snappy * * Multiple, independent chunks of compressed data can be compressed by using - * separate gpu_inflate_input_s/gpu_inflate_status_s pairs for each chunk. + * separate input/output/status for each chunk. * - * @param[in] inputs List of input argument structures - * @param[out] outputs List of output status structures - * @param[in] count Number of input/output structures + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers + * @param[out] statuses List of output status structures * @param[in] stream CUDA stream to use */ -cudaError_t gpu_snap(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - rmm::cuda_stream_view stream); +void gpu_snap(device_span const> inputs, + device_span const> outputs, + device_span statuses, + rmm::cuda_stream_view stream); } // namespace io } // namespace cudf diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp new file mode 100644 index 00000000000..b2e6f07b80b --- /dev/null +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "nvcomp_adapter.hpp" +#include "nvcomp_adapter.cuh" + +#include + +#include + +namespace cudf::io::nvcomp { + +template +auto batched_decompress_get_temp_size(compression_type type, Args&&... args) +{ + switch (type) { + case compression_type::SNAPPY: + return nvcompBatchedSnappyDecompressGetTempSize(std::forward(args)...); + default: CUDF_FAIL("Unsupported compression type"); + } +}; + +template +auto batched_decompress_async(compression_type type, Args&&... args) +{ + switch (type) { + case compression_type::SNAPPY: + return nvcompBatchedSnappyDecompressAsync(std::forward(args)...); + default: CUDF_FAIL("Unsupported compression type"); + } +}; + +size_t get_temp_size(compression_type type, size_t num_chunks, size_t max_uncomp_chunk_size) +{ + size_t temp_size = 0; + nvcompStatus_t nvcomp_status = + batched_decompress_get_temp_size(type, num_chunks, max_uncomp_chunk_size, &temp_size); + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, + "Unable to get scratch size for decompression"); + + return temp_size; +} + +void batched_decompress(compression_type type, + device_span const> inputs, + device_span const> outputs, + device_span statuses, + size_t max_uncomp_chunk_size, + rmm::cuda_stream_view stream) +{ + auto const num_chunks = inputs.size(); + + // cuDF inflate inputs converted to nvcomp inputs + auto const nvcomp_args = create_batched_nvcomp_args(inputs, outputs, stream); + rmm::device_uvector actual_uncompressed_data_sizes(num_chunks, stream); + rmm::device_uvector nvcomp_statuses(num_chunks, stream); + // Temporary space required for decompression + rmm::device_buffer scratch(get_temp_size(type, num_chunks, max_uncomp_chunk_size), stream); + auto const nvcomp_status = batched_decompress_async(type, + nvcomp_args.compressed_data_ptrs.data(), + nvcomp_args.compressed_data_sizes.data(), + nvcomp_args.uncompressed_data_sizes.data(), + actual_uncompressed_data_sizes.data(), + num_chunks, + scratch.data(), + scratch.size(), + nvcomp_args.uncompressed_data_ptrs.data(), + nvcomp_statuses.data(), + stream.value()); + CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "unable to perform decompression"); + + convert_status(nvcomp_statuses, actual_uncompressed_data_sizes, statuses, stream); +} +} // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cu b/cpp/src/io/comp/nvcomp_adapter.cu new file mode 100644 index 00000000000..ce294cc9b00 --- /dev/null +++ b/cpp/src/io/comp/nvcomp_adapter.cu @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "nvcomp_adapter.cuh" + +#include + +#include + +namespace cudf::io::nvcomp { + +batched_args create_batched_nvcomp_args(device_span const> inputs, + device_span const> outputs, + rmm::cuda_stream_view stream) +{ + size_t num_comp_pages = inputs.size(); + rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); + rmm::device_uvector compressed_data_sizes(num_comp_pages, stream); + rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); + rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); + + // Prepare the input vectors + auto ins_it = + thrust::make_zip_iterator(compressed_data_ptrs.begin(), compressed_data_sizes.begin()); + thrust::transform( + rmm::exec_policy(stream), inputs.begin(), inputs.end(), ins_it, [] __device__(auto const& in) { + return thrust::make_tuple(in.data(), in.size()); + }); + + // Prepare the output vectors + auto outs_it = + thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin()); + thrust::transform( + rmm::exec_policy(stream), + outputs.begin(), + outputs.end(), + outs_it, + [] __device__(auto const& out) { return thrust::make_tuple(out.data(), out.size()); }); + + return {std::move(compressed_data_ptrs), + std::move(compressed_data_sizes), + std::move(uncompressed_data_ptrs), + std::move(uncompressed_data_sizes)}; +} + +void convert_status(device_span nvcomp_stats, + device_span actual_uncompressed_sizes, + device_span cudf_stats, + rmm::cuda_stream_view stream) +{ + thrust::transform( + rmm::exec_policy(stream), + nvcomp_stats.begin(), + nvcomp_stats.end(), + actual_uncompressed_sizes.begin(), + cudf_stats.begin(), + [] __device__(auto const& status, auto const& size) { + return decompress_status{size, status == nvcompStatus_t::nvcompSuccess ? 0u : 1u}; + }); +} +} // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.cuh b/cpp/src/io/comp/nvcomp_adapter.cuh new file mode 100644 index 00000000000..a76ddcf6813 --- /dev/null +++ b/cpp/src/io/comp/nvcomp_adapter.cuh @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "gpuinflate.h" + +#include + +#include + +#include +#include + +namespace cudf::io::nvcomp { + +struct batched_args { + rmm::device_uvector compressed_data_ptrs; + rmm::device_uvector compressed_data_sizes; + rmm::device_uvector uncompressed_data_ptrs; + rmm::device_uvector uncompressed_data_sizes; +}; + +/** + * @brief Split lists of src/dst device spans into lists of pointers/sizes. + * + * @param[in] inputs List of input buffers + * @param[in] outputs List of output buffers + * @param[in] stream CUDA stream to use + */ +batched_args create_batched_nvcomp_args(device_span const> inputs, + device_span const> outputs, + rmm::cuda_stream_view stream); + +/** + * @brief Convert nvcomp statuses into cuIO compression statuses. + */ +void convert_status(device_span nvcomp_stats, + device_span actual_uncompressed_sizes, + device_span cudf_stats, + rmm::cuda_stream_view stream); +} // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/nvcomp_adapter.hpp b/cpp/src/io/comp/nvcomp_adapter.hpp new file mode 100644 index 00000000000..a0eb6bc4fbf --- /dev/null +++ b/cpp/src/io/comp/nvcomp_adapter.hpp @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "gpuinflate.h" + +#include + +#include + +namespace cudf::io::nvcomp { + +enum class compression_type { SNAPPY }; + +/** + * @brief Device batch decompression of given type. + * + * @param[in] type Compression type + * @param[in] inputs List of input buffers + * @param[out] outputs List of output buffers + * @param[out] statuses List of output status structures + * @param[in] max_uncomp_page_size maximum size of uncompressed block + * @param[in] stream CUDA stream to use + */ +void batched_decompress(compression_type type, + device_span const> inputs, + device_span const> outputs, + device_span statuses, + size_t max_uncomp_page_size, + rmm::cuda_stream_view stream); +} // namespace cudf::io::nvcomp diff --git a/cpp/src/io/comp/snap.cu b/cpp/src/io/comp/snap.cu index 9f0a610f8f7..d64eea06631 100644 --- a/cpp/src/io/comp/snap.cu +++ b/cpp/src/io/comp/snap.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -258,7 +258,9 @@ static __device__ uint32_t Match60(const uint8_t* src1, * @param[in] count Number of blocks to compress */ __global__ void __launch_bounds__(128) - snap_kernel(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs, int count) + snap_kernel(device_span const> inputs, + device_span const> outputs, + device_span statuses) { __shared__ __align__(16) snap_state_s state_g; @@ -268,15 +270,15 @@ __global__ void __launch_bounds__(128) const uint8_t* src; if (!t) { - const auto* src = static_cast(inputs[blockIdx.x].srcDevice); - auto src_len = static_cast(inputs[blockIdx.x].srcSize); - auto* dst = static_cast(inputs[blockIdx.x].dstDevice); - auto dst_len = static_cast(inputs[blockIdx.x].dstSize); - uint8_t* end = dst + dst_len; - s->src = src; - s->src_len = src_len; - s->dst_base = dst; - s->end = end; + auto const src = inputs[blockIdx.x].data(); + auto src_len = static_cast(inputs[blockIdx.x].size()); + auto dst = outputs[blockIdx.x].data(); + auto const dst_len = static_cast(outputs[blockIdx.x].size()); + auto const end = dst + dst_len; + s->src = src; + s->src_len = src_len; + s->dst_base = dst; + s->end = end; while (src_len > 0x7f) { if (dst < end) { dst[0] = src_len | 0x80; } dst++; @@ -335,23 +337,22 @@ __global__ void __launch_bounds__(128) } __syncthreads(); if (!t) { - outputs[blockIdx.x].bytes_written = s->dst - s->dst_base; - outputs[blockIdx.x].status = (s->dst > s->end) ? 1 : 0; - outputs[blockIdx.x].reserved = 0; + statuses[blockIdx.x].bytes_written = s->dst - s->dst_base; + statuses[blockIdx.x].status = (s->dst > s->end) ? 1 : 0; + statuses[blockIdx.x].reserved = 0; } } -cudaError_t __host__ gpu_snap(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - rmm::cuda_stream_view stream) +void gpu_snap(device_span const> inputs, + device_span const> outputs, + device_span statuses, + rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block - dim3 dim_grid(count, 1); - if (count > 0) { - snap_kernel<<>>(inputs, outputs, count); + dim3 dim_grid(inputs.size(), 1); + if (inputs.size() > 0) { + snap_kernel<<>>(inputs, outputs, statuses); } - return cudaSuccess; } } // namespace io diff --git a/cpp/src/io/comp/unsnap.cu b/cpp/src/io/comp/unsnap.cu index 791a16bc912..dc44b9fcd59 100644 --- a/cpp/src/io/comp/unsnap.cu +++ b/cpp/src/io/comp/unsnap.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2020, NVIDIA CORPORATION. + * Copyright (c) 2018-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,14 +64,15 @@ struct unsnap_queue_s { * @brief snappy decompression state */ struct unsnap_state_s { - const uint8_t* base; ///< base ptr of compressed stream - const uint8_t* end; ///< end of compressed stream - uint32_t uncompressed_size; ///< uncompressed stream size - uint32_t bytes_left; ///< bytes to uncompressed remaining - int32_t error; ///< current error status - uint32_t tstart; ///< start time for perf logging - volatile unsnap_queue_s q; ///< queue for cross-warp communication - gpu_inflate_input_s in; ///< input parameters for current block + const uint8_t* base; ///< base ptr of compressed stream + const uint8_t* end; ///< end of compressed stream + uint32_t uncompressed_size; ///< uncompressed stream size + uint32_t bytes_left; ///< remaining bytes to decompress + int32_t error; ///< current error status + uint32_t tstart; ///< start time for perf logging + volatile unsnap_queue_s q; ///< queue for cross-warp communication + device_span src; ///< input for current block + device_span dst; ///< output for current block }; inline __device__ volatile uint8_t& byte_access(unsnap_state_s* s, uint32_t pos) @@ -497,9 +498,9 @@ __device__ void snappy_decode_symbols(unsnap_state_s* s, uint32_t t) template __device__ void snappy_process_symbols(unsnap_state_s* s, int t, Storage& temp_storage) { - const uint8_t* literal_base = s->base; - auto* out = static_cast(s->in.dstDevice); - int batch = 0; + auto const literal_base = s->base; + auto out = s->dst.data(); + int batch = 0; do { volatile unsnap_batch_s* b = &s->q.batch[batch * batch_size]; @@ -624,7 +625,9 @@ __device__ void snappy_process_symbols(unsnap_state_s* s, int t, Storage& temp_s */ template __global__ void __launch_bounds__(block_size) - unsnap_kernel(gpu_inflate_input_s* inputs, gpu_inflate_status_s* outputs) + unsnap_kernel(device_span const> inputs, + device_span const> outputs, + device_span statuses) { __shared__ __align__(16) unsnap_state_s state_g; __shared__ cub::WarpReduce::TempStorage temp_storage; @@ -632,16 +635,14 @@ __global__ void __launch_bounds__(block_size) unsnap_state_s* s = &state_g; int strm_id = blockIdx.x; - if (t < sizeof(gpu_inflate_input_s) / sizeof(uint32_t)) { - reinterpret_cast(&s->in)[t] = reinterpret_cast(&inputs[strm_id])[t]; - __threadfence_block(); - } if (t < batch_count) { s->q.batch_len[t] = 0; } __syncthreads(); if (!t) { - const auto* cur = static_cast(s->in.srcDevice); - const uint8_t* end = cur + s->in.srcSize; - s->error = 0; + s->src = inputs[strm_id]; + s->dst = outputs[strm_id]; + auto cur = s->src.begin(); + auto const end = s->src.end(); + s->error = 0; if (log_cyclecount) { s->tstart = clock(); } if (cur < end) { // Read uncompressed size (varint), limited to 32-bit @@ -672,7 +673,7 @@ __global__ void __launch_bounds__(block_size) s->bytes_left = uncompressed_size; s->base = cur; s->end = end; - if ((cur >= end && uncompressed_size != 0) || (uncompressed_size > s->in.dstSize)) { + if ((cur >= end && uncompressed_size != 0) || (uncompressed_size > s->dst.size())) { s->error = -1; } } else { @@ -697,28 +698,25 @@ __global__ void __launch_bounds__(block_size) __syncthreads(); } if (!t) { - outputs[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; - outputs[strm_id].status = s->error; + statuses[strm_id].bytes_written = s->uncompressed_size - s->bytes_left; + statuses[strm_id].status = s->error; if (log_cyclecount) { - outputs[strm_id].reserved = clock() - s->tstart; + statuses[strm_id].reserved = clock() - s->tstart; } else { - outputs[strm_id].reserved = 0; + statuses[strm_id].reserved = 0; } } } -cudaError_t __host__ gpu_unsnap(gpu_inflate_input_s* inputs, - gpu_inflate_status_s* outputs, - int count, - rmm::cuda_stream_view stream) +void gpu_unsnap(device_span const> inputs, + device_span const> outputs, + device_span statuses, + rmm::cuda_stream_view stream) { - uint32_t count32 = (count > 0) ? count : 0; - dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block - dim3 dim_grid(count32, 1); // TODO: Check max grid dimensions vs max expected count - - unsnap_kernel<128><<>>(inputs, outputs); + dim3 dim_block(128, 1); // 4 warps per stream, 1 stream per block + dim3 dim_grid(inputs.size(), 1); // TODO: Check max grid dimensions vs max expected count - return cudaSuccess; + unsnap_kernel<128><<>>(inputs, outputs, statuses); } } // namespace io diff --git a/cpp/src/io/orc/orc_gpu.h b/cpp/src/io/orc/orc_gpu.h index d94aa00c7b9..837fd03a112 100644 --- a/cpp/src/io/orc/orc_gpu.h +++ b/cpp/src/io/orc/orc_gpu.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,9 +43,10 @@ struct CompressedStreamInfo { : compressed_data(compressed_data_), uncompressed_data(nullptr), compressed_data_size(compressed_size_), - decctl(nullptr), - decstatus(nullptr), - copyctl(nullptr), + dec_in_ctl(nullptr), + dec_out_ctl(nullptr), + copy_in_ctl(nullptr), + copy_out_ctl(nullptr), num_compressed_blocks(0), num_uncompressed_blocks(0), max_uncompressed_size(0), @@ -54,14 +55,15 @@ struct CompressedStreamInfo { } const uint8_t* compressed_data; // [in] base ptr to compressed stream data uint8_t* uncompressed_data; // [in] base ptr to uncompressed stream data or NULL if not known yet - size_t compressed_data_size; // [in] compressed data size for this stream - gpu_inflate_input_s* decctl; // [in] base ptr to decompression structure to be filled - gpu_inflate_status_s* decstatus; // [in] results of decompression - gpu_inflate_input_s* - copyctl; // [in] base ptr to copy structure to be filled for uncompressed blocks + size_t compressed_data_size; // [in] compressed data size for this stream + device_span* dec_in_ctl; // [in] input buffer to decompress + device_span* dec_out_ctl; // [in] output buffer to decompress into + device_span decstatus; // [in] results of decompression + device_span* copy_in_ctl; // [out] input buffer to copy + device_span* copy_out_ctl; // [out] output buffer to copy to uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed // blocks(out) - uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of + uint32_t num_uncompressed_blocks; // [in,out] number of entries in dec_in_ctl(in), number of // uncompressed blocks(out) uint64_t max_uncompressed_size; // [out] maximum uncompressed data size of stream uint32_t max_uncompressed_block_size; // [out] maximum uncompressed size of any block in stream @@ -345,8 +347,9 @@ void CompactOrcDataStreams(device_2dspan strm_desc, * @param[in] max_comp_blk_size Max size of any block after compression * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in,out] enc_streams chunk streams device array [column][rowgroup] - * @param[out] comp_in Per-block compression input parameters - * @param[out] comp_out Per-block compression status + * @param[out] comp_in Per-block compression input buffers + * @param[out] comp_out Per-block compression output buffers + * @param[out] comp_stat Per-block compression status * @param[in] stream CUDA stream used for device memory operations and kernel launches */ void CompressOrcDataStreams(uint8_t* compressed_data, @@ -356,8 +359,9 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - device_span comp_in, - device_span comp_out, + device_span> comp_in, + device_span> comp_out, + device_span comp_stat, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index a768d568178..139eb28d1a1 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -25,6 +25,7 @@ #include "timezone.cuh" #include +#include #include #include @@ -40,8 +41,6 @@ #include #include -#include - #include #include #include @@ -262,7 +261,7 @@ auto decimal_column_type(std::vector const& decimal128_columns, } // namespace -__global__ void decompress_check_kernel(device_span stats, +__global__ void decompress_check_kernel(device_span stats, bool* any_block_failure) { auto tid = blockIdx.x * blockDim.x + threadIdx.x; @@ -273,7 +272,7 @@ __global__ void decompress_check_kernel(device_span } } -void decompress_check(device_span stats, +void decompress_check(device_span stats, bool* any_block_failure, rmm::cuda_stream_view stream) { @@ -284,74 +283,6 @@ void decompress_check(device_span stats, decompress_check_kernel<<>>(stats, any_block_failure); } -__global__ void convert_nvcomp_status(device_span nvcomp_stats, - device_span actual_uncompressed_sizes, - device_span stats) -{ - auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1; - stats[tid].bytes_written = actual_uncompressed_sizes[tid]; - } -} - -void snappy_decompress(device_span comp_in, - device_span comp_stat, - size_t max_uncomp_page_size, - rmm::cuda_stream_view stream) -{ - size_t num_blocks = comp_in.size(); - size_t temp_size; - - auto status = - nvcompBatchedSnappyDecompressGetTempSize(num_blocks, max_uncomp_page_size, &temp_size); - CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, - "Unable to get scratch size for snappy decompression"); - - rmm::device_buffer scratch(temp_size, stream); - rmm::device_uvector compressed_data_ptrs(num_blocks, stream); - rmm::device_uvector compressed_data_sizes(num_blocks, stream); - rmm::device_uvector uncompressed_data_ptrs(num_blocks, stream); - rmm::device_uvector uncompressed_data_sizes(num_blocks, stream); - - rmm::device_uvector actual_uncompressed_data_sizes(num_blocks, stream); - rmm::device_uvector statuses(num_blocks, stream); - - device_span actual_uncompressed_sizes_span(actual_uncompressed_data_sizes.data(), - actual_uncompressed_data_sizes.size()); - device_span statuses_span(statuses.data(), statuses.size()); - - // Prepare the vectors - auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), - compressed_data_sizes.begin(), - uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.data()); - thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); - }); - - status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(), - compressed_data_sizes.data(), - uncompressed_data_sizes.data(), - actual_uncompressed_data_sizes.data(), - num_blocks, - scratch.data(), - scratch.size(), - uncompressed_data_ptrs.data(), - statuses.data(), - stream.value()); - CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression"); - - dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(num_blocks, static_cast(block.x))); - convert_nvcomp_status<<>>( - statuses_span, actual_uncompressed_sizes_span, comp_stat); -} - rmm::device_buffer reader::impl::decompress_stripe_data( cudf::detail::hostdevice_2dvector& chunks, const std::vector& stripe_data, @@ -396,9 +327,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data( CUDF_EXPECTS(total_decomp_size > 0, "No decompressible data found"); rmm::device_buffer decomp_data(total_decomp_size, stream); - rmm::device_uvector inflate_in( + rmm::device_uvector> inflate_in( + num_compressed_blocks + num_uncompressed_blocks, stream); + rmm::device_uvector> inflate_out( num_compressed_blocks + num_uncompressed_blocks, stream); - rmm::device_uvector inflate_out(num_compressed_blocks, stream); + rmm::device_uvector inflate_stats(num_compressed_blocks, stream); // Parse again to populate the decompression input/output buffers size_t decomp_offset = 0; @@ -408,9 +341,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data( for (size_t i = 0; i < compinfo.size(); ++i) { auto dst_base = static_cast(decomp_data.data()); compinfo[i].uncompressed_data = dst_base + decomp_offset; - compinfo[i].decctl = inflate_in.data() + start_pos; - compinfo[i].decstatus = inflate_out.data() + start_pos; - compinfo[i].copyctl = inflate_in.data() + start_pos_uncomp; + compinfo[i].dec_in_ctl = inflate_in.data() + start_pos; + compinfo[i].dec_out_ctl = inflate_out.data() + start_pos; + compinfo[i].decstatus = {inflate_stats.data() + start_pos, compinfo[i].num_compressed_blocks}; + compinfo[i].copy_in_ctl = inflate_in.data() + start_pos_uncomp; + compinfo[i].copy_out_ctl = inflate_out.data() + start_pos_uncomp; stream_info[i].dst_pos = decomp_offset; decomp_offset += compinfo[i].max_uncompressed_size; @@ -428,29 +363,36 @@ rmm::device_buffer reader::impl::decompress_stripe_data( // Dispatch batches of blocks to decompress if (num_compressed_blocks > 0) { - device_span inflate_out_view(inflate_out.data(), num_compressed_blocks); + device_span> inflate_in_view{inflate_in.data(), + num_compressed_blocks}; + device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; switch (decompressor->GetKind()) { case orc::ZLIB: - CUDF_CUDA_TRY( - gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream)); + gpuinflate( + inflate_in_view, inflate_out_view, inflate_stats, gzip_header_included::NO, stream); break; case orc::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { - device_span inflate_in_view{inflate_in.data(), - num_compressed_blocks}; - snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream); + nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, + inflate_in_view, + inflate_out_view, + inflate_stats, + max_uncomp_block_size, + stream); } else { - CUDF_CUDA_TRY( - gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream)); + gpu_unsnap(inflate_in_view, inflate_out_view, inflate_stats, stream); } break; default: CUDF_FAIL("Unexpected decompression dispatch"); break; } - decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream); + decompress_check(inflate_stats, any_block_failure.device_ptr(), stream); } if (num_uncompressed_blocks > 0) { - CUDF_CUDA_TRY(gpu_copy_uncompressed_blocks( - inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream)); + device_span> copy_in_view{inflate_in.data() + num_compressed_blocks, + num_uncompressed_blocks}; + device_span> copy_out_view{inflate_out.data() + num_compressed_blocks, + num_uncompressed_blocks}; + gpu_copy_uncompressed_blocks(copy_in_view, copy_out_view, stream); } gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream); diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index f1d524058d2..3fe623be5b1 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -1141,8 +1141,9 @@ __global__ void __launch_bounds__(1024) * * @param[in] strm_desc StripeStream device array [stripe][stream] * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[out] comp_in Per-block compression input parameters - * @param[out] comp_out Per-block compression status + * @param[out] inputs Per-block compression input buffers + * @param[out] outputs Per-block compression output buffers + * @param[out] statuses Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression @@ -1151,8 +1152,9 @@ __global__ void __launch_bounds__(1024) __global__ void __launch_bounds__(256) gpuInitCompressionBlocks(device_2dspan strm_desc, device_2dspan streams, // const? - device_span comp_in, - device_span comp_out, + device_span> inputs, + device_span> outputs, + device_span statuses, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1175,16 +1177,11 @@ __global__ void __launch_bounds__(256) dst = compressed_bfr + ss.bfr_offset; num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 1; for (uint32_t b = t; b < num_blocks; b += 256) { - gpu_inflate_input_s* blk_in = &comp_in[ss.first_block + b]; - gpu_inflate_status_s* blk_out = &comp_out[ss.first_block + b]; uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - blk_in->srcDevice = src + b * comp_blk_size; - blk_in->srcSize = blk_size; - blk_in->dstDevice = dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE; - blk_in->dstSize = max_comp_blk_size; - blk_out->bytes_written = blk_size; - blk_out->status = 1; - blk_out->reserved = 0; + inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size}; + outputs[ss.first_block + b] = { + dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE, max_comp_blk_size}; + statuses[ss.first_block + b] = {blk_size, 1, 0}; } } @@ -1194,8 +1191,9 @@ __global__ void __launch_bounds__(256) * * @param[in,out] strm_desc StripeStream device array [stripe][stream] * @param[in] chunks EncChunk device array [rowgroup][column] - * @param[in] comp_in Per-block compression input parameters - * @param[in] comp_out Per-block compression status + * @param[out] inputs Per-block compression input buffers + * @param[out] outputs Per-block compression output buffers + * @param[out] statuses Per-block compression status * @param[in] compressed_bfr Compression output buffer * @param[in] comp_blk_size Compression block size * @param[in] max_comp_blk_size Max size of any block after compression @@ -1203,8 +1201,9 @@ __global__ void __launch_bounds__(256) // blockDim {1024,1,1} __global__ void __launch_bounds__(1024) gpuCompactCompressedBlocks(device_2dspan strm_desc, - device_span comp_in, - device_span comp_out, + device_span const> inputs, + device_span const> outputs, + device_span statuses, uint8_t* compressed_bfr, uint32_t comp_blk_size, uint32_t max_comp_blk_size) @@ -1228,21 +1227,21 @@ __global__ void __launch_bounds__(1024) b = 0; do { if (t == 0) { - gpu_inflate_input_s* blk_in = &comp_in[ss.first_block + b]; - gpu_inflate_status_s* blk_out = &comp_out[ss.first_block + b]; - uint32_t src_len = + auto const src_len = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size)); - uint32_t dst_len = (blk_out->status == 0) ? blk_out->bytes_written : src_len; - uint32_t blk_size24; + auto dst_len = (statuses[ss.first_block + b].status == 0) + ? statuses[ss.first_block + b].bytes_written + : src_len; + uint32_t blk_size24{}; if (dst_len >= src_len) { // Copy from uncompressed source - src = static_cast(blk_in->srcDevice); - blk_out->bytes_written = src_len; - dst_len = src_len; - blk_size24 = dst_len * 2 + 1; + src = inputs[ss.first_block + b].data(); + statuses[ss.first_block + b].bytes_written = src_len; + dst_len = src_len; + blk_size24 = dst_len * 2 + 1; } else { // Compressed block - src = static_cast(blk_in->dstDevice); + src = outputs[ss.first_block + b].data(); blk_size24 = dst_len * 2 + 0; } dst[0] = static_cast(blk_size24 >> 0); @@ -1311,14 +1310,21 @@ void CompressOrcDataStreams(uint8_t* compressed_data, uint32_t max_comp_blk_size, device_2dspan strm_desc, device_2dspan enc_streams, - device_span comp_in, - device_span comp_out, + device_span> comp_in, + device_span> comp_out, + device_span comp_stat, rmm::cuda_stream_view stream) { dim3 dim_block_init(256, 1); dim3 dim_grid(strm_desc.size().first, strm_desc.size().second); - gpuInitCompressionBlocks<<>>( - strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); + gpuInitCompressionBlocks<<>>(strm_desc, + enc_streams, + comp_in, + comp_out, + comp_stat, + compressed_data, + comp_blk_size, + max_comp_blk_size); if (compression == SNAPPY) { if (detail::nvcomp_integration::is_stable_enabled()) { try { @@ -1336,15 +1342,18 @@ void CompressOrcDataStreams(uint8_t* compressed_data, rmm::device_uvector compressed_bytes_written(num_compressed_blocks, stream); auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.begin(), - compressed_data_ptrs.begin()); + uncompressed_data_sizes.begin()); + thrust::transform( + rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(auto const& in) { return thrust::make_tuple(in.data(), in.size()); }); thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); - }); + comp_out.begin(), + comp_out.end(), + compressed_data_ptrs.begin(), + [] __device__(auto const& out) { return out.data(); }); nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), uncompressed_data_sizes.data(), max_comp_blk_size, @@ -1361,9 +1370,9 @@ void CompressOrcDataStreams(uint8_t* compressed_data, thrust::transform(rmm::exec_policy(stream), compressed_bytes_written.begin(), compressed_bytes_written.end(), - comp_out.begin(), + comp_stat.begin(), [] __device__(size_t size) { - gpu_inflate_status_s status{}; + decompress_status status{}; status.bytes_written = size; return status; }); @@ -1371,18 +1380,18 @@ void CompressOrcDataStreams(uint8_t* compressed_data, // If we reach this then there was an error in compressing so set an error status for each // block thrust::for_each(rmm::exec_policy(stream), - comp_out.begin(), - comp_out.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + comp_stat.begin(), + comp_stat.end(), + [] __device__(decompress_status & stat) { stat.status = 1; }); }; } else { - gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream); + gpu_snap(comp_in, comp_out, comp_stat, stream); } } dim3 dim_block_compact(1024, 1); gpuCompactCompressedBlocks<<>>( - strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size); + strm_desc, comp_in, comp_out, comp_stat, compressed_data, comp_blk_size, max_comp_blk_size); } } // namespace gpu diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 276a1f49abf..e44ca10922f 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -26,9 +26,16 @@ namespace cudf { namespace io { namespace orc { namespace gpu { + +struct comp_in_out { + uint8_t const* in_ptr; + size_t in_size; + uint8_t* out_ptr; + size_t out_size; +}; struct compressed_stream_s { CompressedStreamInfo info; - gpu_inflate_input_s ctl; + comp_in_out ctl; }; // blockDim {128,1,1} @@ -57,7 +64,8 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); uint32_t is_uncompressed = block_len & 1; uint32_t uncompressed_size; - gpu_inflate_input_s* init_ctl = nullptr; + device_span* init_in_ctl = nullptr; + device_span* init_out_ctl = nullptr; block_len >>= 1; cur += BLOCK_HEADER_SIZE; if (block_len > block_size || cur + block_len > end) { @@ -82,27 +90,34 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat uncompressed[max_uncompressed_size + lane_id] = cur[lane_id]; } } else { - init_ctl = s->info.copyctl; - init_ctl = (init_ctl && num_uncompressed_blocks < s->info.num_uncompressed_blocks) - ? &init_ctl[num_uncompressed_blocks] - : nullptr; + init_in_ctl = + (s->info.copy_in_ctl && num_uncompressed_blocks < s->info.num_uncompressed_blocks) + ? &s->info.copy_in_ctl[num_uncompressed_blocks] + : nullptr; + init_out_ctl = + (s->info.copy_out_ctl && num_uncompressed_blocks < s->info.num_uncompressed_blocks) + ? &s->info.copy_out_ctl[num_uncompressed_blocks] + : nullptr; num_uncompressed_blocks++; } } else { - init_ctl = s->info.decctl; - init_ctl = (init_ctl && num_compressed_blocks < s->info.num_compressed_blocks) - ? &init_ctl[num_compressed_blocks] - : nullptr; + init_in_ctl = (s->info.dec_in_ctl && num_compressed_blocks < s->info.num_compressed_blocks) + ? &s->info.dec_in_ctl[num_compressed_blocks] + : nullptr; + init_out_ctl = + (s->info.dec_out_ctl && num_compressed_blocks < s->info.num_compressed_blocks) + ? &s->info.dec_out_ctl[num_compressed_blocks] + : nullptr; num_compressed_blocks++; } - if (!lane_id && init_ctl) { - s->ctl.srcDevice = const_cast(cur); - s->ctl.srcSize = block_len; - s->ctl.dstDevice = uncompressed + max_uncompressed_size; - s->ctl.dstSize = uncompressed_size; + if (!lane_id && init_in_ctl) { + s->ctl = {cur, block_len, uncompressed + max_uncompressed_size, uncompressed_size}; } __syncwarp(); - if (init_ctl && lane_id == 0) *init_ctl = s->ctl; + if (init_in_ctl && lane_id == 0) { + *init_in_ctl = {s->ctl.in_ptr, s->ctl.in_size}; + *init_out_ctl = {s->ctl.out_ptr, s->ctl.out_size}; + } cur += block_len; max_uncompressed_size += uncompressed_size; max_uncompressed_block_size = max(max_uncompressed_block_size, uncompressed_size); @@ -137,14 +152,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) s->info.num_compressed_blocks + s->info.num_uncompressed_blocks > 0 && s->info.max_uncompressed_size > 0) { // Walk through the compressed blocks - const uint8_t* cur = s->info.compressed_data; - const uint8_t* end = cur + s->info.compressed_data_size; - const gpu_inflate_input_s* dec_in = s->info.decctl; - const gpu_inflate_status_s* dec_out = s->info.decstatus; - uint8_t* uncompressed_actual = s->info.uncompressed_data; - uint8_t* uncompressed_estimated = uncompressed_actual; - uint32_t num_compressed_blocks = 0; - uint32_t max_compressed_blocks = s->info.num_compressed_blocks; + const uint8_t* cur = s->info.compressed_data; + const uint8_t* end = cur + s->info.compressed_data_size; + auto dec_out = s->info.dec_out_ctl; + auto dec_status = s->info.decstatus; + uint8_t* uncompressed_actual = s->info.uncompressed_data; + uint8_t* uncompressed_estimated = uncompressed_actual; + uint32_t num_compressed_blocks = 0; + uint32_t max_compressed_blocks = s->info.num_compressed_blocks; while (cur + BLOCK_HEADER_SIZE < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); @@ -158,14 +173,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) uncompressed_size_actual = block_len; } else { if (num_compressed_blocks > max_compressed_blocks) { break; } - if (shuffle((lane_id == 0) ? dec_out[num_compressed_blocks].status : 0) != 0) { + if (shuffle((lane_id == 0) ? dec_status[num_compressed_blocks].status : 0) != 0) { // Decompression failed, not much point in doing anything else break; } - uncompressed_size_est = - shuffle((lane_id == 0) ? *(const uint32_t*)&dec_in[num_compressed_blocks].dstSize : 0); - uncompressed_size_actual = shuffle( - (lane_id == 0) ? *(const uint32_t*)&dec_out[num_compressed_blocks].bytes_written : 0); + uint32_t const dst_size = dec_out[num_compressed_blocks].size(); + uncompressed_size_est = shuffle((lane_id == 0) ? dst_size : 0); + uint32_t const bytes_written = dec_status[num_compressed_blocks].bytes_written; + uncompressed_size_actual = shuffle((lane_id == 0) ? bytes_written : 0); } // In practice, this should never happen with a well-behaved writer, as we would expect the // uncompressed size to always be equal to the compression block size except for the last @@ -360,11 +375,11 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, if (strm_len > 0) { int32_t compressed_offset = (t < num_rowgroups) ? s->compressed_offset[t][ci_id] : 0; if (compressed_offset > 0) { - const uint8_t* start = s->strm_info[ci_id].compressed_data; - const uint8_t* cur = start; - const uint8_t* end = cur + s->strm_info[ci_id].compressed_data_size; - gpu_inflate_status_s* decstatus = s->strm_info[ci_id].decstatus; - uint32_t uncomp_offset = 0; + const uint8_t* start = s->strm_info[ci_id].compressed_data; + const uint8_t* cur = start; + const uint8_t* end = cur + s->strm_info[ci_id].compressed_data_size; + auto decstatus = s->strm_info[ci_id].decstatus.data(); + uint32_t uncomp_offset = 0; for (;;) { uint32_t block_len, is_uncompressed; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index 779d0390751..ecd2d6f6ec0 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1314,7 +1314,7 @@ void writer::impl::write_index_stream(int32_t stripe_id, file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, @@ -2050,8 +2050,9 @@ void writer::impl::write(table_view const& table) // Compress the data streams rmm::device_buffer compressed_data(compressed_bfr_size, stream); - hostdevice_vector comp_out(num_compressed_blocks, stream); - hostdevice_vector comp_in(num_compressed_blocks, stream); + hostdevice_vector> comp_in(num_compressed_blocks, stream); + hostdevice_vector> comp_out(num_compressed_blocks, stream); + hostdevice_vector comp_stats(num_compressed_blocks, stream); if (compression_kind_ != NONE) { strm_descs.host_to_device(stream); gpu::CompressOrcDataStreams(static_cast(compressed_data.data()), @@ -2063,9 +2064,10 @@ void writer::impl::write(table_view const& table) enc_data.streams, comp_in, comp_out, + comp_stats, stream); strm_descs.device_to_host(stream); - comp_out.device_to_host(stream, true); + comp_stats.device_to_host(stream, true); } ProtobufWriter pbw_(&buffer_); @@ -2097,7 +2099,7 @@ void writer::impl::write(table_view const& table) segmentation, enc_data.streams, strm_descs, - comp_out, + comp_stats, intermediate_stats.rowgroup_blobs, &stripe, &streams, diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index 5f981793762..d823c73007f 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -386,7 +386,7 @@ class writer::impl { file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, - host_span comp_out, + host_span comp_out, std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 61bd29399cd..f05f0af2a79 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -81,8 +81,6 @@ struct page_enc_state_s { EncPage page; EncColumnChunk ck; parquet_column_device_view col; - gpu_inflate_input_s comp_in; - gpu_inflate_status_s comp_stat; uint16_t vals[rle_buffer_size]; }; @@ -750,8 +748,9 @@ static __device__ std::pair convert_nanoseconds(timesta template __global__ void __launch_bounds__(128, 8) gpuEncodePages(device_span pages, - device_span comp_in, - device_span comp_stat) + device_span> comp_in, + device_span> comp_out, + device_span comp_stats) { __shared__ __align__(8) page_enc_state_s state_g; using block_scan = cub::BlockScan; @@ -761,6 +760,7 @@ __global__ void __launch_bounds__(128, 8) uint32_t t = threadIdx.x; if (t == 0) { + state_g = page_enc_state_s{}; s->page = pages[blockIdx.x]; s->ck = *s->page.chunk; s->col = *s->ck.col_desc; @@ -1085,21 +1085,14 @@ __global__ void __launch_bounds__(128, 8) auto actual_data_size = static_cast(s->cur - base); uint32_t compressed_bfr_size = GetMaxCompressedBfrSize(actual_data_size); s->page.max_data_size = actual_data_size; - s->comp_in.srcDevice = base; - s->comp_in.srcSize = actual_data_size; - s->comp_in.dstDevice = s->page.compressed_data + s->page.max_hdr_size; - s->comp_in.dstSize = compressed_bfr_size; - s->comp_stat.bytes_written = 0; - s->comp_stat.status = ~0; - s->comp_stat.reserved = 0; - } - __syncthreads(); - if (t == 0) { + if (not comp_in.empty()) { + comp_in[blockIdx.x] = {base, actual_data_size}; + comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size, compressed_bfr_size}; + } pages[blockIdx.x] = s->page; - if (not comp_in.empty()) comp_in[blockIdx.x] = s->comp_in; - if (not comp_stat.empty()) { - comp_stat[blockIdx.x] = s->comp_stat; - pages[blockIdx.x].comp_stat = &comp_stat[blockIdx.x]; + if (not comp_stats.empty()) { + comp_stats[blockIdx.x] = {0, ~0u}; + pages[blockIdx.x].comp_stat = &comp_stats[blockIdx.x]; } } } @@ -1317,7 +1310,7 @@ __device__ uint8_t* EncodeStatistics(uint8_t* start, // blockDim(128, 1, 1) __global__ void __launch_bounds__(128) gpuEncodePageHeaders(device_span pages, - device_span comp_stat, + device_span comp_stat, device_span page_stats, const statistics_chunk* chunk_stats) { @@ -1946,14 +1939,15 @@ void InitEncoderPages(device_2dspan chunks, } void EncodePages(device_span pages, - device_span comp_in, - device_span comp_stat, + device_span> comp_in, + device_span> comp_out, + device_span comp_stats, rmm::cuda_stream_view stream) { auto num_pages = pages.size(); // A page is part of one column. This is launching 1 block per page. 1 block will exclusively // deal with one datatype. - gpuEncodePages<128><<>>(pages, comp_in, comp_stat); + gpuEncodePages<128><<>>(pages, comp_in, comp_out, comp_stats); } void DecideCompression(device_span chunks, rmm::cuda_stream_view stream) @@ -1962,7 +1956,7 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view } void EncodePageHeaders(device_span pages, - device_span comp_stat, + device_span comp_stats, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream) @@ -1970,7 +1964,7 @@ void EncodePageHeaders(device_span pages, // TODO: single thread task. No need for 128 threads/block. Earlier it used to employ rest of the // threads to coop load structs gpuEncodePageHeaders<<>>( - pages, comp_stat, page_stats, chunk_stats); + pages, comp_stats, page_stats, chunk_stats); } void GatherPages(device_span chunks, diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 53b82c73a35..057b9a87214 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -378,7 +378,7 @@ struct EncPage { uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in //!< non-leaf levels - gpu_inflate_status_s* comp_stat; //!< Ptr to compression status + decompress_status* comp_stat; //!< Ptr to compression status }; /** @@ -584,13 +584,15 @@ void InitEncoderPages(cudf::detail::device_2dspan chunks, * @brief Launches kernel for packing column data into parquet pages * * @param[in,out] pages Device array of EncPages (unordered) - * @param[out] comp_in Optionally initializes compressor input params - * @param[out] comp_out Optionally initializes compressor output params + * @param[out] comp_in Compressor input buffers + * @param[out] comp_in Compressor output buffers + * @param[out] comp_stats Compressor statuses * @param[in] stream CUDA stream to use, default 0 */ void EncodePages(device_span pages, - device_span comp_in, - device_span comp_out, + device_span> comp_in, + device_span> comp_out, + device_span comp_stats, rmm::cuda_stream_view stream); /** @@ -605,13 +607,13 @@ void DecideCompression(device_span chunks, rmm::cuda_stream_view * @brief Launches kernel to encode page headers * * @param[in,out] pages Device array of EncPages - * @param[in] comp_out Compressor status or nullptr if no compression + * @param[in] comp_stats Compressor status * @param[in] page_stats Optional page-level statistics to be included in page header * @param[in] chunk_stats Optional chunk-level statistics to be encoded * @param[in] stream CUDA stream to use, default 0 */ void EncodePageHeaders(device_span pages, - device_span comp_out, + device_span comp_stats, device_span page_stats, const statistics_chunk* chunk_stats, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index cfca0bad518..a40993ee2dd 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -24,6 +24,7 @@ #include "compact_protocol_reader.hpp" #include +#include #include #include @@ -38,10 +39,9 @@ #include #include -#include - #include #include +#include #include #include @@ -1050,96 +1050,13 @@ void reader::impl::decode_page_headers(hostdevice_vector& pages.device_to_host(stream, true); } -__global__ void decompress_check_kernel(device_span stats, - bool* any_block_failure) -{ - auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - if (stats[tid].status != 0) { - *any_block_failure = true; // Doesn't need to be atomic - } - } -} - -void decompress_check(device_span stats, - bool* any_block_failure, - rmm::cuda_stream_view stream) -{ - if (stats.empty()) { return; } // early exit for empty stats - - dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); - decompress_check_kernel<<>>(stats, any_block_failure); -} - -__global__ void convert_nvcomp_status(device_span nvcomp_stats, - device_span stats) -{ - auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1; - } -} - -void snappy_decompress(device_span comp_in, - device_span comp_stat, - size_t max_uncomp_page_size, - rmm::cuda_stream_view stream) +void decompress_check(device_span stats, rmm::cuda_stream_view stream) { - size_t num_comp_pages = comp_in.size(); - size_t temp_size; - - nvcompStatus_t nvcomp_status = - nvcompBatchedSnappyDecompressGetTempSize(num_comp_pages, max_uncomp_page_size, &temp_size); - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, - "Unable to get scratch size for snappy decompression"); - - // Not needed now but nvcomp API makes no promises about future - rmm::device_buffer scratch(temp_size, stream); - // Analogous to comp_in.srcDevice - rmm::device_uvector compressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.srcSize - rmm::device_uvector compressed_data_sizes(num_comp_pages, stream); - // Analogous to comp_in.dstDevice - rmm::device_uvector uncompressed_data_ptrs(num_comp_pages, stream); - // Analogous to comp_in.dstSize - rmm::device_uvector uncompressed_data_sizes(num_comp_pages, stream); - - // Analogous to comp_stat.bytes_written - rmm::device_uvector actual_uncompressed_data_sizes(num_comp_pages, stream); - // Convertible to comp_stat.status - rmm::device_uvector statuses(num_comp_pages, stream); - device_span statuses_span(statuses.data(), statuses.size()); - - // Prepare the vectors - auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(), - compressed_data_sizes.begin(), - uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.data()); - thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize); - }); - - nvcomp_status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(), - compressed_data_sizes.data(), - uncompressed_data_sizes.data(), - actual_uncompressed_data_sizes.data(), - num_comp_pages, - scratch.data(), - scratch.size(), - uncompressed_data_ptrs.data(), - statuses.data(), - stream.value()); - CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, - "unable to perform snappy decompression"); - - dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(num_comp_pages, static_cast(block.x))); - convert_nvcomp_status<<>>(statuses_span, comp_stat); + CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream), + stats.begin(), + stats.end(), + [] __device__(auto const& stat) { return stat.status == 0; }), + "Error during decompression"); } /** @@ -1175,9 +1092,9 @@ rmm::device_buffer reader::impl::decompress_page_data( int32_t max_decompressed_size; }; - std::array codecs{codec_stats{parquet::GZIP, 0, 0}, - codec_stats{parquet::SNAPPY, 0, 0}, - codec_stats{parquet::BROTLI, 0, 0}}; + std::array codecs{codec_stats{parquet::GZIP, 0, 0}, + codec_stats{parquet::SNAPPY, 0, 0}, + codec_stats{parquet::BROTLI, 0, 0}}; auto is_codec_supported = [&codecs](int8_t codec) { if (codec == parquet::UNCOMPRESSED) return true; @@ -1207,91 +1124,73 @@ rmm::device_buffer reader::impl::decompress_page_data( // Dispatch batches of pages to decompress for each codec rmm::device_buffer decomp_pages(total_decomp_size, stream); - hostdevice_vector inflate_in(0, num_comp_pages, stream); - hostdevice_vector inflate_out(0, num_comp_pages, stream); - hostdevice_vector any_block_failure(1, stream); - any_block_failure[0] = false; - any_block_failure.host_to_device(stream); + std::vector> comp_in; + comp_in.reserve(num_comp_pages); + std::vector> comp_out; + comp_out.reserve(num_comp_pages); - device_span inflate_in_view(inflate_in.device_ptr(), inflate_in.size()); - device_span inflate_out_view(inflate_out.device_ptr(), inflate_out.size()); + rmm::device_uvector comp_stats(num_comp_pages, stream); + thrust::fill(rmm::exec_policy(stream), + comp_stats.begin(), + comp_stats.end(), + decompress_status{0, static_cast(-1000), 0}); size_t decomp_offset = 0; - int32_t argc = 0; + int32_t start_pos = 0; for (const auto& codec : codecs) { - if (codec.num_pages > 0) { - int32_t start_pos = argc; - - for_each_codec_page(codec.compression_type, [&](size_t page) { - auto dst_base = static_cast(decomp_pages.data()); - inflate_in[argc].srcDevice = pages[page].page_data; - inflate_in[argc].srcSize = pages[page].compressed_page_size; - inflate_in[argc].dstDevice = dst_base + decomp_offset; - inflate_in[argc].dstSize = pages[page].uncompressed_page_size; - - inflate_out[argc].bytes_written = 0; - inflate_out[argc].status = static_cast(-1000); - inflate_out[argc].reserved = 0; - - pages[page].page_data = static_cast(inflate_in[argc].dstDevice); - decomp_offset += inflate_in[argc].dstSize; - argc++; - }); + if (codec.num_pages == 0) { continue; } - CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_in.device_ptr(start_pos), - inflate_in.host_ptr(start_pos), - sizeof(decltype(inflate_in)::value_type) * (argc - start_pos), - cudaMemcpyHostToDevice, - stream.value())); - CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_out.device_ptr(start_pos), - inflate_out.host_ptr(start_pos), - sizeof(decltype(inflate_out)::value_type) * (argc - start_pos), - cudaMemcpyHostToDevice, - stream.value())); - - switch (codec.compression_type) { - case parquet::GZIP: - CUDF_CUDA_TRY(gpuinflate(inflate_in.device_ptr(start_pos), - inflate_out.device_ptr(start_pos), - argc - start_pos, - 1, - stream)) - break; - case parquet::SNAPPY: - if (nvcomp_integration::is_stable_enabled()) { - snappy_decompress(inflate_in_view.subspan(start_pos, argc - start_pos), - inflate_out_view.subspan(start_pos, argc - start_pos), - codec.max_decompressed_size, - stream); - } else { - CUDF_CUDA_TRY(gpu_unsnap(inflate_in.device_ptr(start_pos), - inflate_out.device_ptr(start_pos), - argc - start_pos, - stream)); - } - break; - case parquet::BROTLI: - CUDF_CUDA_TRY(gpu_debrotli(inflate_in.device_ptr(start_pos), - inflate_out.device_ptr(start_pos), - debrotli_scratch.data(), - debrotli_scratch.size(), - argc - start_pos, - stream)); - break; - default: CUDF_FAIL("Unexpected decompression dispatch"); break; - } - CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_out.host_ptr(start_pos), - inflate_out.device_ptr(start_pos), - sizeof(decltype(inflate_out)::value_type) * (argc - start_pos), - cudaMemcpyDeviceToHost, - stream.value())); + for_each_codec_page(codec.compression_type, [&](size_t page) { + auto dst_base = static_cast(decomp_pages.data()); + comp_in.emplace_back(pages[page].page_data, + static_cast(pages[page].compressed_page_size)); + comp_out.emplace_back(dst_base + decomp_offset, + static_cast(pages[page].uncompressed_page_size)); + + pages[page].page_data = static_cast(comp_out.back().data()); + decomp_offset += comp_out.back().size(); + }); + + host_span const> comp_in_view{comp_in.data() + start_pos, + codec.num_pages}; + auto const d_comp_in = cudf::detail::make_device_uvector_async(comp_in_view, stream); + host_span const> comp_out_view(comp_out.data() + start_pos, + codec.num_pages); + auto const d_comp_out = cudf::detail::make_device_uvector_async(comp_out_view, stream); + device_span d_comp_stats_view(comp_stats.data() + start_pos, + codec.num_pages); + + switch (codec.compression_type) { + case parquet::GZIP: + gpuinflate(d_comp_in, d_comp_out, d_comp_stats_view, gzip_header_included::YES, stream); + break; + case parquet::SNAPPY: + if (nvcomp_integration::is_stable_enabled()) { + nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, + d_comp_in, + d_comp_out, + d_comp_stats_view, + codec.max_decompressed_size, + stream); + } else { + gpu_unsnap(d_comp_in, d_comp_out, d_comp_stats_view, stream); + } + break; + case parquet::BROTLI: + gpu_debrotli(d_comp_in, + d_comp_out, + d_comp_stats_view, + debrotli_scratch.data(), + debrotli_scratch.size(), + stream); + break; + default: CUDF_FAIL("Unexpected decompression dispatch"); break; } + start_pos += codec.num_pages; } - decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream); - any_block_failure.device_to_host(stream, true); // synchronizes stream - CUDF_EXPECTS(not any_block_failure[0], "Error during decompression"); + decompress_check(comp_stats, stream); // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 75a50714407..dbbd39fb508 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -984,8 +984,9 @@ void writer::impl::init_encoder_pages(hostdevice_2dvector& stream.synchronize(); } -void snappy_compress(device_span comp_in, - device_span comp_stat, +void snappy_compress(device_span const> comp_in, + device_span const> comp_out, + device_span comp_stats, size_t max_page_uncomp_data_size, rmm::cuda_stream_view stream) { @@ -1012,16 +1013,20 @@ void snappy_compress(device_span comp_in, // the space allocated unless one uses the API nvcompBatchedSnappyCompressGetOutputSize() // Prepare the vectors - auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), - uncompressed_data_sizes.begin(), - compressed_data_ptrs.begin()); + auto comp_it = + thrust::make_zip_iterator(uncompressed_data_ptrs.begin(), uncompressed_data_sizes.begin()); + thrust::transform( + rmm::exec_policy(stream), + comp_in.begin(), + comp_in.end(), + comp_it, + [] __device__(auto const& in) { return thrust::make_tuple(in.data(), in.size()); }); + thrust::transform(rmm::exec_policy(stream), - comp_in.begin(), - comp_in.end(), - comp_it, - [] __device__(gpu_inflate_input_s in) { - return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice); - }); + comp_out.begin(), + comp_out.end(), + compressed_data_ptrs.begin(), + [] __device__(auto const& out) { return out.data(); }); nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(), uncompressed_data_sizes.data(), max_page_uncomp_data_size, @@ -1041,9 +1046,9 @@ void snappy_compress(device_span comp_in, thrust::transform(rmm::exec_policy(stream), compressed_bytes_written.begin(), compressed_bytes_written.end(), - comp_stat.begin(), + comp_stats.begin(), [] __device__(size_t size) { - gpu_inflate_status_s status{}; + decompress_status status{}; status.bytes_written = size; return status; }); @@ -1051,9 +1056,9 @@ void snappy_compress(device_span comp_in, } catch (...) { // If we reach this then there was an error in compressing so set an error status for each page thrust::for_each(rmm::exec_policy(stream), - comp_stat.begin(), - comp_stat.end(), - [] __device__(gpu_inflate_status_s & stat) { stat.status = 1; }); + comp_stats.begin(), + comp_stats.end(), + [] __device__(decompress_status & stat) { stat.status = 1; }); }; } @@ -1077,19 +1082,17 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks uint32_t max_comp_pages = (compression_ != parquet::Compression::UNCOMPRESSED) ? pages_in_batch : 0; - rmm::device_uvector compression_input(max_comp_pages, stream); - rmm::device_uvector compression_status(max_comp_pages, stream); - - device_span comp_in{compression_input.data(), compression_input.size()}; - device_span comp_stat{compression_status.data(), compression_status.size()}; + rmm::device_uvector> comp_in(max_comp_pages, stream); + rmm::device_uvector> comp_out(max_comp_pages, stream); + rmm::device_uvector comp_stats(max_comp_pages, stream); - gpu::EncodePages(batch_pages, comp_in, comp_stat, stream); + gpu::EncodePages(batch_pages, comp_in, comp_out, comp_stats, stream); switch (compression_) { case parquet::Compression::SNAPPY: if (nvcomp_integration::is_stable_enabled()) { - snappy_compress(comp_in, comp_stat, max_page_uncomp_data_size, stream); + snappy_compress(comp_in, comp_out, comp_stats, max_page_uncomp_data_size, stream); } else { - CUDF_CUDA_TRY(gpu_snap(comp_in.data(), comp_stat.data(), pages_in_batch, stream)); + gpu_snap(comp_in, comp_out, comp_stats, stream); } break; default: break; @@ -1098,7 +1101,7 @@ void writer::impl::encode_pages(hostdevice_2dvector& chunks // chunk-level auto d_chunks_in_batch = chunks.device_view().subspan(first_rowgroup, rowgroups_in_batch); DecideCompression(d_chunks_in_batch.flat_view(), stream); - EncodePageHeaders(batch_pages, comp_stat, batch_pages_stats, chunk_stats, stream); + EncodePageHeaders(batch_pages, comp_stats, batch_pages_stats, chunk_stats, stream); GatherPages(d_chunks_in_batch.flat_view(), pages, stream); auto h_chunks_in_batch = chunks.host_view().subspan(first_rowgroup, rowgroups_in_batch); diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index a754f7cf7d3..30c7b6ec326 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -51,10 +51,10 @@ class hostdevice_vector { } explicit hostdevice_vector(size_t initial_size, size_t max_size, rmm::cuda_stream_view stream) - : num_elements(initial_size), max_elements(max_size) + : max_elements(max_size), num_elements(initial_size) { if (max_elements != 0) { - CUDF_CUDA_TRY(cudaMallocHost(&h_data, sizeof(T) * max_elements)); + CUDF_CUDA_TRY(cudaMallocHost(reinterpret_cast(&h_data), sizeof(T) * max_elements)); d_data.resize(sizeof(T) * max_elements, stream); } } @@ -62,7 +62,7 @@ class hostdevice_vector { ~hostdevice_vector() { if (max_elements != 0) { - auto const free_result = cudaFreeHost(h_data); + [[maybe_unused]] auto const free_result = cudaFreeHost(h_data); assert(free_result == cudaSuccess); } } diff --git a/cpp/tests/io/comp/decomp_test.cpp b/cpp/tests/io/comp/decomp_test.cpp index dd00b201df9..a325cadf6a5 100644 --- a/cpp/tests/io/comp/decomp_test.cpp +++ b/cpp/tests/io/comp/decomp_test.cpp @@ -15,6 +15,7 @@ */ #include +#include #include @@ -24,6 +25,8 @@ #include +using cudf::device_span; + /** * @brief Base test fixture for decompression * @@ -32,19 +35,6 @@ */ template struct DecompressTest : public cudf::test::BaseFixture { - void SetUp() override - { - ASSERT_CUDA_SUCCEEDED(cudaMallocHost((void**)&inf_args, sizeof(cudf::io::gpu_inflate_input_s))); - ASSERT_CUDA_SUCCEEDED( - cudaMallocHost((void**)&inf_stat, sizeof(cudf::io::gpu_inflate_status_s))); - } - - void TearDown() override - { - ASSERT_CUDA_SUCCEEDED(cudaFreeHost(inf_stat)); - ASSERT_CUDA_SUCCEEDED(cudaFreeHost(inf_args)); - } - std::vector vector_from_string(const char* str) const { return std::vector(reinterpret_cast(str), @@ -55,49 +45,43 @@ struct DecompressTest : public cudf::test::BaseFixture { const uint8_t* compressed, size_t compressed_size) { - rmm::device_buffer src{compressed, compressed_size, rmm::cuda_stream_default}; - rmm::device_buffer dst{decompressed->size(), rmm::cuda_stream_default}; - - inf_args->srcDevice = static_cast(src.data()); - inf_args->dstDevice = static_cast(dst.data()); - inf_args->srcSize = src.size(); - inf_args->dstSize = dst.size(); - rmm::device_uvector d_inf_args(1, rmm::cuda_stream_default); - rmm::device_uvector d_inf_stat(1, rmm::cuda_stream_default); - ASSERT_CUDA_SUCCEEDED(cudaMemcpyAsync(d_inf_args.data(), - inf_args, - sizeof(cudf::io::gpu_inflate_input_s), - cudaMemcpyHostToDevice, - 0)); - ASSERT_CUDA_SUCCEEDED(cudaMemcpyAsync(d_inf_stat.data(), - inf_stat, - sizeof(cudf::io::gpu_inflate_status_s), - cudaMemcpyHostToDevice, - 0)); - ASSERT_CUDA_SUCCEEDED( - static_cast(this)->dispatch(d_inf_args.data(), d_inf_stat.data())); - ASSERT_CUDA_SUCCEEDED(cudaMemcpyAsync(inf_stat, - d_inf_stat.data(), - sizeof(cudf::io::gpu_inflate_status_s), - cudaMemcpyDeviceToHost, - 0)); - ASSERT_CUDA_SUCCEEDED(cudaMemcpyAsync( - decompressed->data(), inf_args->dstDevice, inf_args->dstSize, cudaMemcpyDeviceToHost, 0)); - ASSERT_CUDA_SUCCEEDED(cudaStreamSynchronize(0)); + auto stream = rmm::cuda_stream_default; + rmm::device_buffer src{compressed, compressed_size, stream}; + rmm::device_uvector dst{decompressed->size(), stream}; + + hostdevice_vector> inf_in(1, stream); + inf_in[0] = {static_cast(src.data()), src.size()}; + inf_in.host_to_device(stream); + + hostdevice_vector> inf_out(1, stream); + inf_out[0] = dst; + inf_out.host_to_device(stream); + + hostdevice_vector inf_stat(1, stream); + inf_stat[0] = {}; + inf_stat.host_to_device(stream); + + static_cast(this)->dispatch(inf_in, inf_out, inf_stat); + cudaMemcpyAsync( + decompressed->data(), dst.data(), dst.size(), cudaMemcpyDeviceToHost, stream.value()); + inf_stat.device_to_host(stream, true); + ASSERT_EQ(inf_stat[0].status, 0); } - - cudf::io::gpu_inflate_input_s* inf_args = nullptr; - cudf::io::gpu_inflate_status_s* inf_stat = nullptr; }; /** * @brief Derived fixture for GZIP decompression */ struct GzipDecompressTest : public DecompressTest { - cudaError_t dispatch(cudf::io::gpu_inflate_input_s* d_inf_args, - cudf::io::gpu_inflate_status_s* d_inf_stat) + void dispatch(device_span> d_inf_in, + device_span> d_inf_out, + device_span d_inf_stat) { - return cudf::io::gpuinflate(d_inf_args, d_inf_stat, 1, 1, rmm::cuda_stream_default); + cudf::io::gpuinflate(d_inf_in, + d_inf_out, + d_inf_stat, + cudf::io::gzip_header_included::YES, + rmm::cuda_stream_default); } }; @@ -105,10 +89,11 @@ struct GzipDecompressTest : public DecompressTest { * @brief Derived fixture for Snappy decompression */ struct SnappyDecompressTest : public DecompressTest { - cudaError_t dispatch(cudf::io::gpu_inflate_input_s* d_inf_args, - cudf::io::gpu_inflate_status_s* d_inf_stat) + void dispatch(device_span> d_inf_in, + device_span> d_inf_out, + device_span d_inf_stat) { - return cudf::io::gpu_unsnap(d_inf_args, d_inf_stat, 1, rmm::cuda_stream_default); + cudf::io::gpu_unsnap(d_inf_in, d_inf_out, d_inf_stat, rmm::cuda_stream_default); } }; @@ -116,14 +101,19 @@ struct SnappyDecompressTest : public DecompressTest { * @brief Derived fixture for Brotli decompression */ struct BrotliDecompressTest : public DecompressTest { - cudaError_t dispatch(cudf::io::gpu_inflate_input_s* d_inf_args, - cudf::io::gpu_inflate_status_s* d_inf_stat) + void dispatch(device_span> d_inf_in, + device_span> d_inf_out, + device_span d_inf_stat) { rmm::device_buffer d_scratch{cudf::io::get_gpu_debrotli_scratch_size(1), rmm::cuda_stream_default}; - return cudf::io::gpu_debrotli( - d_inf_args, d_inf_stat, d_scratch.data(), d_scratch.size(), 1, rmm::cuda_stream_default); + cudf::io::gpu_debrotli(d_inf_in, + d_inf_out, + d_inf_stat, + d_scratch.data(), + d_scratch.size(), + rmm::cuda_stream_default); } };