Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snappy decompress check more efficient #9995

Merged
merged 11 commits into from
Mar 21, 2022
80 changes: 60 additions & 20 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
#include <io/utilities/config_utils.hpp>
#include <io/utilities/time_utils.cuh>

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/bit.hpp>
Expand Down Expand Up @@ -259,6 +260,39 @@ auto decimal_column_type(std::vector<std::string> const& float64_columns,

} // namespace

__global__ void decompress_check_kernel(device_span<gpu_inflate_status_s const> stats,
bool* any_block_failure)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
if (stats[tid].status != 0) {
*any_block_failure = true; // Doesn't need to be atomic
}
}
}

void decompress_check(device_span<gpu_inflate_status_s> stats,
bool* any_block_failure,
rmm::cuda_stream_view stream)
{
if (stats.empty()) { return; } // early exit for empty stats

dim3 block(128);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast<size_t>(block.x)));
decompress_check_kernel<<<grid, block, 0, stream.value()>>>(stats, any_block_failure);
}

__global__ void convert_nvcomp_status(device_span<nvcompStatus_t const> nvcomp_stats,
device_span<size_t const> actual_uncompressed_sizes,
device_span<gpu_inflate_status_s> stats)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1;
stats[tid].bytes_written = actual_uncompressed_sizes[tid];
}
}

void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_uncomp_page_size,
Expand All @@ -281,6 +315,10 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_blocks, stream);
rmm::device_uvector<nvcompStatus_t> statuses(num_blocks, stream);

device_span<size_t const> actual_uncompressed_sizes_span(actual_uncompressed_data_sizes.data(),
actual_uncompressed_data_sizes.size());
device_span<nvcompStatus_t const> statuses_span(statuses.data(), statuses.size());

// Prepare the vectors
auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(),
compressed_data_sizes.begin(),
Expand All @@ -306,19 +344,10 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
stream.value());
CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression");

CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
statuses.begin(),
statuses.end(),
thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)),
"Error during snappy decompression");
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
num_blocks,
[=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) {
comp_stat[i].bytes_written = actual_uncomp_sizes[i];
comp_stat[i].status = 0;
});
dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(num_blocks, static_cast<size_t>(block.x)));
convert_nvcomp_status<<<grid, block, 0, stream.value()>>>(
statuses_span, actual_uncompressed_sizes_span, comp_stat);
}

rmm::device_buffer reader::impl::decompress_stripe_data(
Expand All @@ -332,6 +361,11 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
bool use_base_stride,
rmm::cuda_stream_view stream)
{
// For checking whether we decompress successfully
hostdevice_vector<bool> any_block_failure(1, stream);
any_block_failure[0] = false;
any_block_failure.host_to_device(stream);

// Parse the columns' compressed info
hostdevice_vector<gpu::CompressedStreamInfo> compinfo(0, stream_info.size(), stream);
for (const auto& info : stream_info) {
Expand All @@ -340,6 +374,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
info.length));
}
compinfo.host_to_device(stream);

gpu::ParseCompressedStripeData(compinfo.device_ptr(),
compinfo.size(),
decompressor->GetBlockSize(),
Expand Down Expand Up @@ -391,6 +426,7 @@ rmm::device_buffer reader::impl::decompress_stripe_data(

// Dispatch batches of blocks to decompress
if (num_compressed_blocks > 0) {
device_span<gpu_inflate_status_s> inflate_out_view(inflate_out.data(), num_compressed_blocks);
switch (decompressor->GetKind()) {
case orc::ZLIB:
CUDA_TRY(
Expand All @@ -400,8 +436,6 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
if (nvcomp_integration::is_stable_enabled()) {
device_span<gpu_inflate_input_s> inflate_in_view{inflate_in.data(),
num_compressed_blocks};
device_span<gpu_inflate_status_s> inflate_out_view{inflate_out.data(),
num_compressed_blocks};
snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream);
} else {
CUDA_TRY(
Expand All @@ -410,22 +444,28 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
break;
default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break;
}
decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream);
}
if (num_uncompressed_blocks > 0) {
CUDA_TRY(gpu_copy_uncompressed_blocks(
inflate_in.data() + num_compressed_blocks, num_uncompressed_blocks, stream));
}
gpu::PostDecompressionReassemble(compinfo.device_ptr(), compinfo.size(), stream);

any_block_failure.device_to_host(stream);

compinfo.device_to_host(stream, true);

// We can check on host after stream synchronize
CUDF_EXPECTS(not any_block_failure[0], "Error during decompression");

const size_t num_columns = chunks.size().second;

// Update the stream information with the updated uncompressed info
// TBD: We could update the value from the information we already
// have in stream_info[], but using the gpu results also updates
// max_uncompressed_size to the actual uncompressed size, or zero if
// decompression failed.
compinfo.device_to_host(stream, true);

const size_t num_columns = chunks.size().second;

for (size_t i = 0; i < num_stripes; ++i) {
for (size_t j = 0; j < num_columns; ++j) {
auto& chunk = chunks[i][j];
Expand Down
55 changes: 44 additions & 11 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <io/utilities/config_utils.hpp>
#include <io/utilities/time_utils.cuh>

#include <cudf/detail/utilities/integer_utils.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/table/table.hpp>
#include <cudf/utilities/error.hpp>
Expand Down Expand Up @@ -1035,6 +1036,37 @@ void reader::impl::decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>&
pages.device_to_host(stream, true);
}

__global__ void decompress_check_kernel(device_span<gpu_inflate_status_s const> stats,
bool* any_block_failure)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
if (stats[tid].status != 0) {
*any_block_failure = true; // Doesn't need to be atomic
}
}
}

void decompress_check(device_span<gpu_inflate_status_s> stats,
bool* any_block_failure,
rmm::cuda_stream_view stream)
{
if (stats.empty()) { return; } // early exit for empty stats

dim3 block(128);
PointKernel marked this conversation as resolved.
Show resolved Hide resolved
dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast<size_t>(block.x)));
decompress_check_kernel<<<grid, block, 0, stream.value()>>>(stats, any_block_failure);
}

__global__ void convert_nvcomp_status(device_span<nvcompStatus_t const> nvcomp_stats,
device_span<gpu_inflate_status_s> stats)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
stats[tid].status = nvcomp_stats[tid] == nvcompStatus_t::nvcompSuccess ? 0 : 1;
}
}

void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_uncomp_page_size,
Expand Down Expand Up @@ -1063,6 +1095,7 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_comp_pages, stream);
// Convertible to comp_stat.status
rmm::device_uvector<nvcompStatus_t> statuses(num_comp_pages, stream);
device_span<nvcompStatus_t const> statuses_span(statuses.data(), statuses.size());

// Prepare the vectors
auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(),
Expand Down Expand Up @@ -1090,16 +1123,9 @@ void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"unable to perform snappy decompression");

CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
uncompressed_data_sizes.begin(),
uncompressed_data_sizes.end(),
actual_uncompressed_data_sizes.begin()),
"Mismatch in expected and actual decompressed size during snappy decompression");
CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
statuses.begin(),
statuses.end(),
thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)),
"Error during snappy decompression");
dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(num_comp_pages, static_cast<size_t>(block.x)));
convert_nvcomp_status<<<grid, block, 0, stream.value()>>>(statuses_span, comp_stat);
}

/**
Expand Down Expand Up @@ -1157,6 +1183,10 @@ rmm::device_buffer reader::impl::decompress_page_data(
hostdevice_vector<gpu_inflate_input_s> inflate_in(0, num_comp_pages, stream);
hostdevice_vector<gpu_inflate_status_s> inflate_out(0, num_comp_pages, stream);

hostdevice_vector<bool> any_block_failure(1, stream);
any_block_failure[0] = false;
any_block_failure.host_to_device(stream);

device_span<gpu_inflate_input_s> inflate_in_view(inflate_in.device_ptr(), inflate_in.size());
device_span<gpu_inflate_status_s> inflate_out_view(inflate_out.device_ptr(), inflate_out.size());

Expand Down Expand Up @@ -1231,7 +1261,10 @@ rmm::device_buffer reader::impl::decompress_page_data(
stream.value()));
}
}
stream.synchronize();

decompress_check(inflate_out_view, any_block_failure.device_ptr(), stream);
any_block_failure.device_to_host(stream, true); // synchronizes stream
CUDF_EXPECTS(not any_block_failure[0], "Error during decompression");

// Update the page information in device memory with the updated value of
// page_data; it now points to the uncompressed data buffer
Expand Down