Skip to content

Commit

Permalink
decompress_page_data clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
vuule committed Apr 27, 2022
1 parent 268438e commit 2bd4f4e
Showing 1 changed file with 58 additions and 108 deletions.
166 changes: 58 additions & 108 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#include <thrust/for_each.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/logical.h>
#include <thrust/transform.h>
#include <thrust/tuple.h>

Expand Down Expand Up @@ -1049,26 +1050,13 @@ void reader::impl::decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>&
pages.device_to_host(stream, true);
}

__global__ void decompress_check_kernel(device_span<decompress_status const> stats,
bool* any_block_failure)
void decompress_check(device_span<decompress_status const> stats, rmm::cuda_stream_view stream)
{
auto tid = blockIdx.x * blockDim.x + threadIdx.x;
if (tid < stats.size()) {
if (stats[tid].status != 0) {
*any_block_failure = true; // Doesn't need to be atomic
}
}
}

void decompress_check(device_span<decompress_status> stats,
bool* any_block_failure,
rmm::cuda_stream_view stream)
{
if (stats.empty()) { return; } // early exit for empty stats

dim3 block(128);
dim3 grid(cudf::util::div_rounding_up_safe(stats.size(), static_cast<size_t>(block.x)));
decompress_check_kernel<<<grid, block, 0, stream.value()>>>(stats, any_block_failure);
CUDF_EXPECTS(thrust::all_of(rmm::exec_policy(stream),
stats.begin(),
stats.end(),
[] __device__(auto const& stat) { return stat.status == 0; }),
"Error during decompression");
}

/**
Expand Down Expand Up @@ -1136,104 +1124,66 @@ rmm::device_buffer reader::impl::decompress_page_data(

// Dispatch batches of pages to decompress for each codec
rmm::device_buffer decomp_pages(total_decomp_size, stream);
hostdevice_vector<device_span<uint8_t const>> inflate_in(0, num_comp_pages, stream);
hostdevice_vector<device_span<uint8_t>> inflate_out(0, num_comp_pages, stream);
hostdevice_vector<decompress_status> inflate_stats(0, num_comp_pages, stream);

hostdevice_vector<bool> any_block_failure(1, stream);
any_block_failure[0] = false;
any_block_failure.host_to_device(stream);

device_span<device_span<uint8_t const>> inflate_in_view(inflate_in.device_ptr(),
inflate_in.size());
device_span<device_span<uint8_t>> inflate_out_view(inflate_out.device_ptr(), inflate_in.size());
device_span<decompress_status> inflate_stats_view(inflate_stats.device_ptr(),
inflate_stats.size());
rmm::device_uvector<decompress_status> inflate_stats(num_comp_pages, stream);
thrust::fill(rmm::exec_policy(stream),
inflate_stats.begin(),
inflate_stats.end(),
decompress_status{0, static_cast<uint32_t>(-1000), 0});

size_t decomp_offset = 0;
int32_t argc = 0;
int32_t start_pos = 0;
for (const auto& codec : codecs) {
if (codec.num_pages > 0) {
int32_t start_pos = argc;

for_each_codec_page(codec.compression_type, [&](size_t page) {
auto dst_base = static_cast<uint8_t*>(decomp_pages.data());
inflate_in[argc] = {pages[page].page_data,
static_cast<size_t>(pages[page].compressed_page_size)};
inflate_out[argc] = {dst_base + decomp_offset,
static_cast<size_t>(pages[page].uncompressed_page_size)};

inflate_stats[argc].bytes_written = 0;
inflate_stats[argc].status = static_cast<uint32_t>(-1000);
inflate_stats[argc].reserved = 0;

pages[page].page_data = static_cast<uint8_t*>(inflate_out[argc].data());
decomp_offset += inflate_out[argc].size();
argc++;
});
if (codec.num_pages == 0) { continue; }

std::vector<device_span<uint8_t const>> inflate_in;
std::vector<device_span<uint8_t>> inflate_out;
for_each_codec_page(codec.compression_type, [&](size_t page) {
auto dst_base = static_cast<uint8_t*>(decomp_pages.data());
inflate_in.emplace_back(pages[page].page_data,
static_cast<size_t>(pages[page].compressed_page_size));
inflate_out.emplace_back(dst_base + decomp_offset,
static_cast<size_t>(pages[page].uncompressed_page_size));

pages[page].page_data = static_cast<uint8_t*>(inflate_out.back().data());
decomp_offset += inflate_out.back().size();
});

CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_in.device_ptr(start_pos),
inflate_in.host_ptr(start_pos),
sizeof(decltype(inflate_in)::value_type) * (argc - start_pos),
cudaMemcpyHostToDevice,
stream.value()));
CUDF_CUDA_TRY(cudaMemcpyAsync(inflate_out.device_ptr(start_pos),
inflate_out.host_ptr(start_pos),
sizeof(decltype(inflate_out)::value_type) * (argc - start_pos),
cudaMemcpyHostToDevice,
stream.value()));
CUDF_CUDA_TRY(
cudaMemcpyAsync(inflate_stats.device_ptr(start_pos),
inflate_stats.host_ptr(start_pos),
sizeof(decltype(inflate_stats)::value_type) * (argc - start_pos),
cudaMemcpyHostToDevice,
stream.value()));

switch (codec.compression_type) {
case parquet::GZIP:
gpuinflate(inflate_in_view.subspan(start_pos, argc - start_pos),
inflate_out_view.subspan(start_pos, argc - start_pos),
inflate_stats_view.subspan(start_pos, argc - start_pos),
gzip_header_included::YES,
auto const d_inflate_in = cudf::detail::make_device_uvector_async(inflate_in, stream);
auto const d_inflate_out = cudf::detail::make_device_uvector_async(inflate_out, stream);
device_span<decompress_status> inflate_stats_view(inflate_stats.data() + start_pos,
inflate_in.size());
switch (codec.compression_type) {
case parquet::GZIP:
gpuinflate(
d_inflate_in, d_inflate_out, inflate_stats_view, gzip_header_included::YES, stream);
break;
case parquet::SNAPPY:
if (nvcomp_integration::is_stable_enabled()) {
nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY,
d_inflate_in,
d_inflate_out,
inflate_stats_view,
codec.max_decompressed_size,
stream);
} else {
gpu_unsnap(d_inflate_in, d_inflate_out, inflate_stats_view, stream);
}
break;
case parquet::BROTLI:
gpu_debrotli(d_inflate_in,
d_inflate_out,
inflate_stats_view,
debrotli_scratch.data(),
debrotli_scratch.size(),
stream);
break;
case parquet::SNAPPY:
if (nvcomp_integration::is_stable_enabled()) {
nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY,
inflate_in_view.subspan(start_pos, argc - start_pos),
inflate_out_view.subspan(start_pos, argc - start_pos),
inflate_stats_view.subspan(start_pos, argc - start_pos),
codec.max_decompressed_size,
stream);
} else {
gpu_unsnap(inflate_in_view.subspan(start_pos, argc - start_pos),
inflate_out_view.subspan(start_pos, argc - start_pos),
inflate_stats_view.subspan(start_pos, argc - start_pos),
stream);
}
break;
case parquet::BROTLI:
gpu_debrotli(inflate_in_view.subspan(start_pos, argc - start_pos),
inflate_out_view.subspan(start_pos, argc - start_pos),
inflate_stats_view.subspan(start_pos, argc - start_pos),
debrotli_scratch.data(),
debrotli_scratch.size(),
stream);
break;
default: CUDF_FAIL("Unexpected decompression dispatch"); break;
}
CUDF_CUDA_TRY(
cudaMemcpyAsync(inflate_stats.host_ptr(start_pos),
inflate_stats.device_ptr(start_pos),
sizeof(decltype(inflate_stats)::value_type) * (argc - start_pos),
cudaMemcpyDeviceToHost,
stream.value()));
break;
default: CUDF_FAIL("Unexpected decompression dispatch"); break;
}
start_pos += inflate_in.size();
}

decompress_check(inflate_stats_view, any_block_failure.device_ptr(), stream);
any_block_failure.device_to_host(stream, true); // synchronizes stream
CUDF_EXPECTS(not any_block_failure[0], "Error during decompression");
decompress_check(inflate_stats, stream);

// Update the page information in device memory with the updated value of
// page_data; it now points to the uncompressed data buffer
Expand Down

0 comments on commit 2bd4f4e

Please sign in to comment.