Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization to decoding of parquet level streams #13203

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/include/cudf/detail/utilities/integer_utils.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* Copyright 2019 BlazingDB, Inc.
* Copyright 2019 Eyal Rozenberg <[email protected]>
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ namespace util {
* `modulus` is positive. The safety is in regard to rollover.
*/
template <typename S>
S round_up_safe(S number_to_round, S modulus)
constexpr S round_up_safe(S number_to_round, S modulus)
{
auto remainder = number_to_round % modulus;
if (remainder == 0) { return number_to_round; }
Expand All @@ -67,7 +67,7 @@ S round_up_safe(S number_to_round, S modulus)
* `modulus` is positive and does not check for overflow.
*/
template <typename S>
S round_down_safe(S number_to_round, S modulus) noexcept
constexpr S round_down_safe(S number_to_round, S modulus) noexcept
{
auto remainder = number_to_round % modulus;
auto rounded_down = number_to_round - remainder;
Expand Down
447 changes: 270 additions & 177 deletions cpp/src/io/parquet/page_data.cu

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -365,9 +365,11 @@ __global__ void __launch_bounds__(128)
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
vuule marked this conversation as resolved.
Show resolved Hide resolved
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.str_bytes = 0;
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ constexpr int MAX_DICT_BITS = 24;
// Total number of unsigned 24 bit values
constexpr size_type MAX_DICT_SIZE = (1 << MAX_DICT_BITS) - 1;

// level decode buffer size.
constexpr int LEVEL_DECODE_BUF_SIZE = 2048;

/**
* @brief Struct representing an input column in the file.
*/
Expand Down Expand Up @@ -193,6 +196,9 @@ struct PageInfo {
int32_t nesting_info_size;
PageNestingInfo* nesting;
PageNestingDecodeInfo* nesting_decode;

// level decode buffers
uint8_t* lvl_decode_buf[level_type::NUM_LEVEL_TYPES];
};

/**
Expand Down Expand Up @@ -284,6 +290,9 @@ struct file_intermediate_data {
hostdevice_vector<gpu::PageInfo> pages_info{};
hostdevice_vector<gpu::PageNestingInfo> page_nesting_info{};
hostdevice_vector<gpu::PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_buffer level_decode_data;
vuule marked this conversation as resolved.
Show resolved Hide resolved
int level_type_size;
};

/**
Expand Down Expand Up @@ -451,6 +460,7 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
* computed
* @param compute_string_sizes If set to true, the str_bytes field in PageInfo will
* be computed
* @param level_type_size Size in bytes of the type for level decoding
* @param stream CUDA stream to use, default 0
*/
void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
Expand All @@ -459,6 +469,7 @@ void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
size_t num_rows,
bool compute_num_rows,
bool compute_string_sizes,
int level_type_size,
rmm::cuda_stream_view stream);

/**
Expand All @@ -471,12 +482,14 @@ void ComputePageSizes(hostdevice_vector<PageInfo>& pages,
* @param[in] chunks All chunks to be decoded
* @param[in] num_rows Total number of rows to read
* @param[in] min_row Minimum number of rows to read
* @param[in] level_type_size Size in bytes of the type for level decoding
* @param[in] stream CUDA stream to use, default 0
*/
void DecodePageData(hostdevice_vector<PageInfo>& pages,
hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
rmm::cuda_stream_view stream);

/**
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_valids.host_to_device(_stream);
chunk_nested_data.host_to_device(_stream);

gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _stream);
gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _file_itm_data.level_type_size, _stream);

pages.device_to_host(_stream);
page_nesting.device_to_host(_stream);
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ class reader::impl {
*/
void allocate_nesting_info();

/**
* @brief Allocate space for use when decoding definition/repetition levels.
*
* One large contiguous buffer of data allocated and
* distributed among the PageInfo structs.
*/
void allocate_level_decode_space();

/**
* @brief Read a chunk of data and return an output table.
*
Expand Down
79 changes: 63 additions & 16 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,11 @@ constexpr bool is_supported_encoding(Encoding enc)
* @param chunks List of column chunk descriptors
* @param pages List of page information
* @param stream CUDA stream used for device memory operations and kernel launches
* @returns The size in bytes of level type data required
*/
void decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
hostdevice_vector<gpu::PageInfo>& pages,
rmm::cuda_stream_view stream)
int decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
hostdevice_vector<gpu::PageInfo>& pages,
rmm::cuda_stream_view stream)
{
// IMPORTANT : if you change how pages are stored within a chunk (dist pages, then data pages),
// please update preprocess_nested_columns to reflect this.
Expand All @@ -340,13 +341,31 @@ void decode_page_headers(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,

chunks.host_to_device(stream);
gpu::DecodePageHeaders(chunks.device_ptr(), chunks.size(), stream);

// compute max bytes needed for level data
auto level_bit_size =
cudf::detail::make_counting_transform_iterator(0, [chunks = chunks.begin()] __device__(int i) {
auto c = chunks[i];
return static_cast<int>(std::max(c.level_bits[gpu::level_type::REPETITION],
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
c.level_bits[gpu::level_type::DEFINITION]));
});
// max level data bit size.
int const max_level_bits = thrust::reduce(rmm::exec_policy(stream),
level_bit_size,
level_bit_size + chunks.size(),
0,
thrust::maximum<int>());
auto const level_type_size = max(1, cudf::util::div_rounding_up_safe(max_level_bits, 8));
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved

pages.device_to_host(stream, true);

// validate page encodings
CUDF_EXPECTS(std::all_of(pages.begin(),
pages.end(),
[](auto const& page) { return is_supported_encoding(page.encoding); }),
"Unsupported page encoding detected");

return level_type_size;
}

/**
Expand Down Expand Up @@ -565,9 +584,6 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info =
hostdevice_vector<gpu::PageNestingDecodeInfo>{total_page_nesting_infos, _stream};

// retrieve from the gpu so we can update
pages.device_to_host(_stream, true);

// update pointers in the PageInfos
int target_page_index = 0;
int src_info_index = 0;
Expand All @@ -593,9 +609,6 @@ void reader::impl::allocate_nesting_info()
target_page_index += chunks[idx].num_data_pages;
}

// copy back to the gpu
pages.host_to_device(_stream);

// fill in
int nesting_info_index = 0;
std::map<int, std::pair<std::vector<int>, std::vector<int>>> depth_remapping;
Expand Down Expand Up @@ -673,6 +686,30 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info.host_to_device(_stream);
}

void reader::impl::allocate_level_decode_space()
{
auto& pages = _file_itm_data.pages_info;

// TODO: this could be made smaller if we ignored dictionary pages and pages with no
// repetition data.
size_t const per_page_decode_buf_size =
LEVEL_DECODE_BUF_SIZE * 2 * _file_itm_data.level_type_size;
auto const decode_buf_size = per_page_decode_buf_size * pages.size();
_file_itm_data.level_decode_data =
rmm::device_buffer(decode_buf_size, _stream, rmm::mr::get_current_device_resource());

// distribute the buffers
uint8_t* buf = static_cast<uint8_t*>(_file_itm_data.level_decode_data.data());
for (size_t idx = 0; idx < pages.size(); idx++) {
auto& p = pages[idx];

p.lvl_decode_buf[gpu::level_type::DEFINITION] = buf;
buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size);
p.lvl_decode_buf[gpu::level_type::REPETITION] = buf;
buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size);
}
}

std::pair<bool, std::vector<std::future<void>>> reader::impl::create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
Expand Down Expand Up @@ -776,7 +813,7 @@ void reader::impl::load_and_decompress_data(
auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;
auto& pages = _file_itm_data.pages_info;

auto const [has_compressed_data, read_rowgroup_tasks] =
create_and_read_column_chunks(row_groups_info, num_rows);
Expand All @@ -787,13 +824,13 @@ void reader::impl::load_and_decompress_data(

// Process dataset chunk pages into output columns
auto const total_pages = count_page_headers(chunks, _stream);
pages_info = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);
pages = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);

if (total_pages > 0) {
// decoding of column/page information
decode_page_headers(chunks, pages_info, _stream);
_file_itm_data.level_type_size = decode_page_headers(chunks, pages, _stream);
if (has_compressed_data) {
decomp_page_data = decompress_page_data(chunks, pages_info, _stream);
decomp_page_data = decompress_page_data(chunks, pages, _stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); }
Expand All @@ -815,9 +852,17 @@ void reader::impl::load_and_decompress_data(
// create it ourselves.
// std::vector<output_column_info> output_info = build_output_column_info();

// nesting information (sizes, etc) stored -per page-
// note : even for flat schemas, we allocate 1 level of "nesting" info
allocate_nesting_info();
// the following two allocate functions modify the page data
pages.device_to_host(_stream, true);
{
// nesting information (sizes, etc) stored -per page-
// note : even for flat schemas, we allocate 1 level of "nesting" info
allocate_nesting_info();

// level decode space
allocate_level_decode_space();
}
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
pages.host_to_device(_stream);
}
}

Expand Down Expand Up @@ -1575,6 +1620,7 @@ void reader::impl::preprocess_pages(size_t skip_rows,
std::numeric_limits<size_t>::max(),
true, // compute num_rows
chunk_read_limit > 0, // compute string sizes
_file_itm_data.level_type_size,
_stream);

// computes:
Expand Down Expand Up @@ -1626,6 +1672,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
num_rows,
false, // num_rows is already computed
false, // no need to compute string sizes
_file_itm_data.level_type_size,
_stream);

// print_pages(pages, _stream);
Expand Down
Loading