Skip to content

Commit

Permalink
Optimization to decoding of parquet level streams (#13203)
Browse files Browse the repository at this point in the history
An optimization to the decoding of the definition and repetition level streams in Parquet files.  Previously, we were decoding these streams using 1 warp.  With this optimization we do it arbitrarily wide (currently set for 512 threads).  This gives a dramatic improvement.

The core of the work is in the new file `rle_stream.cuh` which encapsulates the decoding into an `rle_stream` object.  

This PR only applies the opimization to the `gpuComputePageSizes` kernel, used for preprocessing list columns and for the chunked read case involving strings or lists.  In addition, the `UpdatePageSizes` function has been improved to also work at the block level instead of just using a single warp.   Testing with the cudf parquet reader list benchmarks result in as much as a **75%** reduction in time in the `gpuComputePageSizes` kernel.

Future PRs will apply this to the gpuDecodePageData kernel.

Leaving as a draft for the moment - more detailed benchmarks and numbers forthcoming, along with some possible parameter tuning.

Benchmark info.  A before/after sample from the `parquet_reader_io_compression` suite on an A5000.  The kernel goes from 427 milliseconds to 93 milliseconds.   This seems to be a pretty typical situation, although it will definitely be affected by the encoded data (run lengths, etc).

![pq_opt1](https://user-images.githubusercontent.com/56695930/236043918-bcb01c00-d842-46f5-95bd-9579392cda5f.png)


The reader benchmarks that involve this kernel yield some great improvements.

```
parquet_read_decode (A = Before. B = After)
| data_type |      io       | cardinality | run_length | bytes_per_second (A) | bytes_per_second (B)
|-----------|---------------|-------------|------------|----------------------|---------------------|
|      LIST | DEVICE_BUFFER |           0 |          1 | 5399068099           | 6044036091          |
|      LIST | DEVICE_BUFFER |        1000 |          1 | 5930855807           | 6505889742          |
|      LIST | DEVICE_BUFFER |           0 |         32 | 6862874160           | 7531918407          |
|      LIST | DEVICE_BUFFER |        1000 |         32 | 6781795229           | 7463856554          |
```

```
parquet_read_io_compression (A = Before. B = After)
      io        | compression | bytes_per_second (A) | bytes_per_second(B)
|---------------|-------------|----------------------|-------------------|
| DEVICE_BUFFER |      SNAPPY |        307421363     | 393735255         |
| DEVICE_BUFFER |      SNAPPY |        323998549     | 426045725         |
| DEVICE_BUFFER |      SNAPPY |        386112997     | 508751604         |
| DEVICE_BUFFER |      SNAPPY |        381398279     | 498963635         |

```

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #13203
  • Loading branch information
nvdbaranec authored May 15, 2023
1 parent 403c83f commit 1581773
Show file tree
Hide file tree
Showing 8 changed files with 725 additions and 203 deletions.
6 changes: 3 additions & 3 deletions cpp/include/cudf/detail/utilities/integer_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright 2019 BlazingDB, Inc.
* Copyright 2019 Eyal Rozenberg <[email protected]>
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ namespace util {
* `modulus` is positive. The safety is in regard to rollover.
*/
template <typename S>
S round_up_safe(S number_to_round, S modulus)
constexpr S round_up_safe(S number_to_round, S modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
Expand All @@ -67,7 +67,7 @@ S round_up_safe(S number_to_round, S modulus)
* `modulus` is positive and does not check for overflow.
*/
template <typename S>
S round_down_safe(S number_to_round, S modulus) noexcept
constexpr S round_down_safe(S number_to_round, S modulus) noexcept
{
auto remainder = number_to_round % modulus;
auto rounded_down = number_to_round - remainder;
Expand Down
451 changes: 272 additions & 179 deletions cpp/src/io/parquet/page_data.cu

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -365,9 +365,11 @@ __global__ void __launch_bounds__(128)
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.str_bytes = 0;
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ constexpr int MAX_DICT_BITS = 24;
// Total number of unsigned 24 bit values
constexpr size_type MAX_DICT_SIZE = (1 << MAX_DICT_BITS) - 1;

// level decode buffer size.
constexpr int LEVEL_DECODE_BUF_SIZE = 2048;

/**
* @brief Struct representing an input column in the file.
*/
Expand Down Expand Up @@ -193,6 +196,9 @@ struct PageInfo {
int32_t nesting_info_size;
PageNestingInfo* nesting;
PageNestingDecodeInfo* nesting_decode;

// level decode buffers
uint8_t* lvl_decode_buf[level_type::NUM_LEVEL_TYPES];
};

/**
Expand Down Expand Up @@ -284,6 +290,9 @@ struct file_intermediate_data {
hostdevice_vector<gpu::PageInfo> pages_info{};
hostdevice_vector<gpu::PageNestingInfo> page_nesting_info{};
hostdevice_vector<gpu::PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_buffer level_decode_data;
int level_type_size;
};

/**
Expand Down Expand Up @@ -451,6 +460,7 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
* computed
* @param compute_string_sizes If set to true, the str_bytes field in PageInfo will
* be computed
* @param level_type_size Size in bytes of the type for level decoding
* @param stream CUDA stream to use, default 0
*/
void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
Expand All @@ -459,6 +469,7 @@ void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
size_t num_rows,
bool compute_num_rows,
bool compute_string_sizes,
int level_type_size,
rmm::cuda_stream_view stream);

/**
Expand All @@ -471,12 +482,14 @@ void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
* @param[in] chunks All chunks to be decoded
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] stream CUDA stream to use, default 0
*/
void DecodePageData(hostdevice_vector<PageInfo>& pages,
hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
rmm::cuda_stream_view stream);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device(_stream);
chunk_nested_data.host_to_device(_stream);

gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _stream);
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _file_itm_data.level_type_size, _stream);

pages.device_to_host(_stream);
page_nesting.device_to_host(_stream);
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ class reader::impl {
*/
void allocate_nesting_info();

/**
* @brief Allocate space for use when decoding definition/repetition levels.
*
* One large contiguous buffer of data allocated and
* distributed among the PageInfo structs.
*/
void allocate_level_decode_space();

/**
* @brief Read a chunk of data and return an output table.
*
Expand Down
79 changes: 63 additions & 16 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,11 @@ constexpr bool is_supported_encoding(Encoding enc)
* @param chunks List of column chunk descriptors
* @param pages List of page information
* @param stream CUDA stream used for device memory operations and kernel launches
* @returns The size in bytes of level type data required
*/
void decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
hostdevice_vector<gpu::PageInfo>& pages,
rmm::cuda_stream_view stream)
int decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
hostdevice_vector<gpu::PageInfo>& pages,
rmm::cuda_stream_view stream)
{
// IMPORTANT : if you change how pages are stored within a chunk (dist pages, then data pages),
// please update preprocess_nested_columns to reflect this.
Expand All @@ -340,13 +341,31 @@ void decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,

chunks.host_to_device(stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), stream);

// compute max bytes needed for level data
auto level_bit_size =
cudf::detail::make_counting_transform_iterator(0, [chunks = chunks.begin()] __device__(int i) {
auto c = chunks[i];
return static_cast<int>(
max(c.level_bits[gpu::level_type::REPETITION], c.level_bits[gpu::level_type::DEFINITION]));
});
// max level data bit size.
int const max_level_bits = thrust::reduce(rmm::exec_policy(stream),
level_bit_size,
level_bit_size + chunks.size(),
0,
thrust::maximum<int>());
auto const level_type_size = std::max(1, cudf::util::div_rounding_up_safe(max_level_bits, 8));

pages.device_to_host(stream, true);

// validate page encodings
CUDF_EXPECTS(std::all_of(pages.begin(),
pages.end(),
[](auto const& page) { return is_supported_encoding(page.encoding); }),
"Unsupported page encoding detected");

return level_type_size;
}

/**
Expand Down Expand Up @@ -565,9 +584,6 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info =
hostdevice_vector<gpu::PageNestingDecodeInfo>{total_page_nesting_infos, _stream};

// retrieve from the gpu so we can update
pages.device_to_host(_stream, true);

// update pointers in the PageInfos
int target_page_index = 0;
int src_info_index = 0;
Expand All @@ -593,9 +609,6 @@ void reader::impl::allocate_nesting_info()
target_page_index += chunks[idx].num_data_pages;
}

// copy back to the gpu
pages.host_to_device(_stream);

// fill in
int nesting_info_index = 0;
std::map<int, std::pair<std::vector<int>, std::vector<int>>> depth_remapping;
Expand Down Expand Up @@ -673,6 +686,30 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info.host_to_device(_stream);
}

void reader::impl::allocate_level_decode_space()
{
auto& pages = _file_itm_data.pages_info;

// TODO: this could be made smaller if we ignored dictionary pages and pages with no
// repetition data.
size_t const per_page_decode_buf_size =
LEVEL_DECODE_BUF_SIZE * 2 * _file_itm_data.level_type_size;
auto const decode_buf_size = per_page_decode_buf_size * pages.size();
_file_itm_data.level_decode_data =
rmm::device_buffer(decode_buf_size, _stream, rmm::mr::get_current_device_resource());

// distribute the buffers
uint8_t* buf = static_cast<uint8_t*>(_file_itm_data.level_decode_data.data());
for (size_t idx = 0; idx < pages.size(); idx++) {
auto& p = pages[idx];

p.lvl_decode_buf[gpu::level_type::DEFINITION] = buf;
buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size);
p.lvl_decode_buf[gpu::level_type::REPETITION] = buf;
buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size);
}
}

std::pair<bool, std::vector<std::future<void>>> reader::impl::create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
Expand Down Expand Up @@ -776,7 +813,7 @@ void reader::impl::load_and_decompress_data(
auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;
auto& pages = _file_itm_data.pages_info;

auto const [has_compressed_data, read_rowgroup_tasks] =
create_and_read_column_chunks(row_groups_info, num_rows);
Expand All @@ -787,13 +824,13 @@ void reader::impl::load_and_decompress_data(

// Process dataset chunk pages into output columns
auto const total_pages = count_page_headers(chunks, _stream);
pages_info = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);
pages = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);

if (total_pages > 0) {
// decoding of column/page information
decode_page_headers(chunks, pages_info, _stream);
_file_itm_data.level_type_size = decode_page_headers(chunks, pages, _stream);
if (has_compressed_data) {
decomp_page_data = decompress_page_data(chunks, pages_info, _stream);
decomp_page_data = decompress_page_data(chunks, pages, _stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); }
Expand All @@ -815,9 +852,17 @@ void reader::impl::load_and_decompress_data(
// create it ourselves.
// std::vector<output_column_info> output_info = build_output_column_info();

// nesting information (sizes, etc) stored -per page-
// note : even for flat schemas, we allocate 1 level of "nesting" info
allocate_nesting_info();
// the following two allocate functions modify the page data
pages.device_to_host(_stream, true);
{
// nesting information (sizes, etc) stored -per page-
// note : even for flat schemas, we allocate 1 level of "nesting" info
allocate_nesting_info();

// level decode space
allocate_level_decode_space();
}
pages.host_to_device(_stream);
}
}

Expand Down Expand Up @@ -1575,6 +1620,7 @@ void reader::impl::preprocess_pages(size_t skip_rows,
std::numeric_limits<size_t>::max(),
true, // compute num_rows
chunk_read_limit > 0, // compute string sizes
_file_itm_data.level_type_size,
_stream);

// computes:
Expand Down Expand Up @@ -1626,6 +1672,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
num_rows,
false, // num_rows is already computed
false, // no need to compute string sizes
_file_itm_data.level_type_size,
_stream);

// print_pages(pages, _stream);
Expand Down
Loading

0 comments on commit 1581773

Please sign in to comment.