From 2bd4f4e8c696ee9a69f26a4df54b486b1990be17 Mon Sep 17 00:00:00 2001 From: vuule Date: Wed, 27 Apr 2022 12:57:33 -0700 Subject: [PATCH] decompress_page_data clean up --- cpp/src/io/parquet/reader_impl.cu | 166 +++++++++++------------------- 1 file changed, 58 insertions(+), 108 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index df176dff846..dff5da5a43d 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -41,6 +41,7 @@ #include #include +#include #include #include @@ -1049,26 +1050,13 @@ void reader::impl::decode_page_headers(hostdevice_vector& pages.device_to_host(stream, true); } -__global__ void decompress_check_kernel(device_span stats, - bool* any_block_failure) +void decompress_check(device_span stats, rmm::cuda_stream_view stream) { - auto tid = blockIdx.x * blockDim.x + threadIdx.x; - if (tid < stats.size()) { - if (stats[tid].status != 0) { - *any_block_failure = true; // Doesn't need to be atomic - } - } -} - -void decompress_check(device_span stats, - bool* any_block_failure, - rmm::cuda_stream_view stream) -{ - if (stats.empty()) { return; } // early exit for empty stats - - dim3 block(128); - dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast(block.x))); - decompress_check_kernel<<>>(stats, any_block_failure); + CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream), + stats.begin(), + stats.end(), + [] __device__(auto const& stat) { return stat.status == 0; }), + "Error during decompression"); } /** @@ -1136,104 +1124,66 @@ rmm::device_buffer reader::impl::decompress_page_data( // Dispatch batches of pages to decompress for each codec rmm::device_buffer decomp_pages(total_decomp_size, stream); - hostdevice_vector> inflate_in(0, num_comp_pages, stream); - hostdevice_vector> inflate_out(0, num_comp_pages, stream); - hostdevice_vector inflate_stats(0, num_comp_pages, stream); - hostdevice_vector any_block_failure(1, stream); - any_block_failure[0] = false; - any_block_failure.host_to_device(stream); - - device_span> inflate_in_view(inflate_in.device_ptr(), - inflate_in.size()); - device_span> inflate_out_view(inflate_out.device_ptr(), inflate_in.size()); - device_span inflate_stats_view(inflate_stats.device_ptr(), - inflate_stats.size()); + rmm::device_uvector inflate_stats(num_comp_pages, stream); + thrust::fill(rmm::exec_policy(stream), + inflate_stats.begin(), + inflate_stats.end(), + decompress_status{0, static_cast(-1000), 0}); size_t decomp_offset = 0; - int32_t argc = 0; + int32_t start_pos = 0; for (const auto& codec : codecs) { - if (codec.num_pages > 0) { - int32_t start_pos = argc; - - for_each_codec_page(codec.compression_type, [&](size_t page) { - auto dst_base = static_cast(decomp_pages.data()); - inflate_in[argc] = {pages[page].page_data, - static_cast(pages[page].compressed_page_size)}; - inflate_out[argc] = {dst_base + decomp_offset, - static_cast(pages[page].uncompressed_page_size)}; - - inflate_stats[argc].bytes_written = 0; - inflate_stats[argc].status = static_cast(-1000); - inflate_stats[argc].reserved = 0; - - pages[page].page_data = static_cast(inflate_out[argc].data()); - decomp_offset += inflate_out[argc].size(); - argc++; - }); + if (codec.num_pages == 0) { continue; } + + std::vector> inflate_in; + std::vector> inflate_out; + for_each_codec_page(codec.compression_type, [&](size_t page) { + auto dst_base = static_cast(decomp_pages.data()); + inflate_in.emplace_back(pages[page].page_data, + static_cast(pages[page].compressed_page_size)); + inflate_out.emplace_back(dst_base + decomp_offset, + static_cast(pages[page].uncompressed_page_size)); + + pages[page].page_data = static_cast(inflate_out.back().data()); + decomp_offset += inflate_out.back().size(); + }); - CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_in.device_ptr(start_pos), - inflate_in.host_ptr(start_pos), - sizeof(decltype(inflate_in)::value_type) * (argc - start_pos), - cudaMemcpyHostToDevice, - stream.value())); - CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_out.device_ptr(start_pos), - inflate_out.host_ptr(start_pos), - sizeof(decltype(inflate_out)::value_type) * (argc - start_pos), - cudaMemcpyHostToDevice, - stream.value())); - CUDF_CUDA_TRY( - cudaMemcpyAsync(inflate_stats.device_ptr(start_pos), - inflate_stats.host_ptr(start_pos), - sizeof(decltype(inflate_stats)::value_type) * (argc - start_pos), - cudaMemcpyHostToDevice, - stream.value())); - - switch (codec.compression_type) { - case parquet::GZIP: - gpuinflate(inflate_in_view.subspan(start_pos, argc - start_pos), - inflate_out_view.subspan(start_pos, argc - start_pos), - inflate_stats_view.subspan(start_pos, argc - start_pos), - gzip_header_included::YES, + auto const d_inflate_in = cudf::detail::make_device_uvector_async(inflate_in, stream); + auto const d_inflate_out = cudf::detail::make_device_uvector_async(inflate_out, stream); + device_span inflate_stats_view(inflate_stats.data() + start_pos, + inflate_in.size()); + switch (codec.compression_type) { + case parquet::GZIP: + gpuinflate( + d_inflate_in, d_inflate_out, inflate_stats_view, gzip_header_included::YES, stream); + break; + case parquet::SNAPPY: + if (nvcomp_integration::is_stable_enabled()) { + nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, + d_inflate_in, + d_inflate_out, + inflate_stats_view, + codec.max_decompressed_size, + stream); + } else { + gpu_unsnap(d_inflate_in, d_inflate_out, inflate_stats_view, stream); + } + break; + case parquet::BROTLI: + gpu_debrotli(d_inflate_in, + d_inflate_out, + inflate_stats_view, + debrotli_scratch.data(), + debrotli_scratch.size(), stream); - break; - case parquet::SNAPPY: - if (nvcomp_integration::is_stable_enabled()) { - nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - inflate_in_view.subspan(start_pos, argc - start_pos), - inflate_out_view.subspan(start_pos, argc - start_pos), - inflate_stats_view.subspan(start_pos, argc - start_pos), - codec.max_decompressed_size, - stream); - } else { - gpu_unsnap(inflate_in_view.subspan(start_pos, argc - start_pos), - inflate_out_view.subspan(start_pos, argc - start_pos), - inflate_stats_view.subspan(start_pos, argc - start_pos), - stream); - } - break; - case parquet::BROTLI: - gpu_debrotli(inflate_in_view.subspan(start_pos, argc - start_pos), - inflate_out_view.subspan(start_pos, argc - start_pos), - inflate_stats_view.subspan(start_pos, argc - start_pos), - debrotli_scratch.data(), - debrotli_scratch.size(), - stream); - break; - default: CUDF_FAIL("Unexpected decompression dispatch"); break; - } - CUDF_CUDA_TRY( - cudaMemcpyAsync(inflate_stats.host_ptr(start_pos), - inflate_stats.device_ptr(start_pos), - sizeof(decltype(inflate_stats)::value_type) * (argc - start_pos), - cudaMemcpyDeviceToHost, - stream.value())); + break; + default: CUDF_FAIL("Unexpected decompression dispatch"); break; } + start_pos += inflate_in.size(); } - decompress_check(inflate_stats_view, any_block_failure.device_ptr(), stream); - any_block_failure.device_to_host(stream, true); // synchronizes stream - CUDF_EXPECTS(not any_block_failure[0], "Error during decompression"); + decompress_check(inflate_stats, stream); // Update the page information in device memory with the updated value of // page_data; it now points to the uncompressed data buffer