Skip to content

Commit

Permalink
Compute column sizes in Parquet preprocess with single kernel (#12931)
Browse files Browse the repository at this point in the history
Addresses #11922 

Currently in Parquet preprocessing a `thrust::reduce()` and `thrust::exclusive_scan_by_key()` is performed to compute the column size and offsets for each nested column. For complicated schemas this results in a large number of kernel invocations. This PR calculates the sizes and offsets of all columns in single calls to `thrust::reduce_by_key()` and `thrust::exclusive_scan_by_key()`. 

This change results in around 1.3x speedup when reading a complicated schema.
Before:
![image](https://user-images.githubusercontent.com/26264495/224823213-ae998654-274c-450a-8ad7-ea854541335e.png)

After:
![image](https://user-images.githubusercontent.com/26264495/224823108-cb91c380-5e35-4c77-a6f9-6703e321be05.png)

Authors:
  - Srikar Vanavasam (https://github.com/SrikarVanavasam)

Approvers:
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #12931
  • Loading branch information
SrikarVanavasam authored Apr 7, 2023
1 parent 46b5900 commit f328b64
Showing 1 changed file with 120 additions and 51 deletions.
171 changes: 120 additions & 51 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1215,24 +1215,60 @@ struct get_page_schema {
__device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; }
};

struct input_col_info {
int const schema_idx;
size_type const nesting_depth;
};

/**
* @brief Converts a 1-dimensional index into page, depth and column indices used in
* allocate_columns to compute columns sizes.
*
* The input index will iterate through pages, nesting depth and column indices in that order.
*/
struct reduction_indices {
size_t const page_idx;
size_type const depth_idx;
size_type const col_idx;

__device__ reduction_indices(size_t index_, size_type max_depth_, size_t num_pages_)
: page_idx(index_ % num_pages_),
depth_idx((index_ / num_pages_) % max_depth_),
col_idx(index_ / (max_depth_ * num_pages_))
{
}
};

/**
* @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema.
*/
struct get_page_nesting_size {
size_type const src_col_schema;
size_type const depth;
input_col_info const* const input_cols;
size_type const max_depth;
size_t const num_pages;
gpu::PageInfo const* const pages;
int const* page_indices;

__device__ size_type operator()(int index) const
__device__ size_type operator()(size_t index) const
{
auto const& page = pages[index];
if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto const indices = reduction_indices{index, max_depth, num_pages};

auto const& page = pages[page_indices[indices.page_idx]];
if (page.src_col_schema != input_cols[indices.col_idx].schema_idx ||
page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return 0;
}
return page.nesting[depth].batch_size;

return page.nesting[indices.depth_idx].batch_size;
}
};

struct get_reduction_key {
size_t const num_pages;
__device__ size_t operator()(size_t index) const { return index / num_pages; }
};

/**
* @brief Writes to the chunk_row field of the PageInfo struct.
*/
Expand All @@ -1259,11 +1295,12 @@ struct chunk_row_output_iter {
* @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema.
*/
struct start_offset_output_iterator {
gpu::PageInfo* pages;
gpu::PageInfo const* pages;
int const* page_indices;
int cur_index;
int src_col_schema;
int nesting_depth;
size_t cur_index;
input_col_info const* input_cols;
size_type max_depth;
size_t num_pages;
int empty = 0;
using value_type = size_type;
using difference_type = size_type;
Expand All @@ -1273,32 +1310,37 @@ struct start_offset_output_iterator {

constexpr void operator=(start_offset_output_iterator const& other)
{
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
src_col_schema = other.src_col_schema;
nesting_depth = other.nesting_depth;
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
input_cols = other.input_cols;
max_depth = other.max_depth;
num_pages = other.num_pages;
}

constexpr start_offset_output_iterator operator+(int i)
constexpr start_offset_output_iterator operator+(size_t i)
{
return start_offset_output_iterator{
pages, page_indices, cur_index + i, src_col_schema, nesting_depth};
pages, page_indices, cur_index + i, input_cols, max_depth, num_pages};
}

constexpr void operator++() { cur_index++; }

__device__ reference operator[](int i) { return dereference(cur_index + i); }
__device__ reference operator[](size_t i) { return dereference(cur_index + i); }
__device__ reference operator*() { return dereference(cur_index); }

private:
__device__ reference dereference(int index)
__device__ reference dereference(size_t index)
{
gpu::PageInfo const& p = pages[page_indices[index]];
if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto const indices = reduction_indices{index, max_depth, num_pages};

gpu::PageInfo const& p = pages[page_indices[indices.page_idx]];
if (p.src_col_schema != input_cols[indices.col_idx].schema_idx ||
p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return empty;
}
return p.nesting_decode[nesting_depth].page_start_value;
return p.nesting_decode[indices.depth_idx].page_start_value;
}
};

Expand Down Expand Up @@ -1620,52 +1662,79 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses

// compute output column sizes by examining the pages of the -input- columns
if (has_lists) {
auto& page_keys = _chunk_itm_data.page_keys;
auto& page_index = _chunk_itm_data.page_index;
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
auto src_col_schema = input_col.schema_idx;
size_t max_depth = input_col.nesting_depth();

auto* cols = &_output_buffers;
for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) {
std::vector<input_col_info> h_cols_info;
h_cols_info.reserve(_input_columns.size());
std::transform(_input_columns.cbegin(),
_input_columns.cend(),
std::back_inserter(h_cols_info),
[](auto& col) -> input_col_info {
return {col.schema_idx, static_cast<size_type>(col.nesting_depth())};
});

auto const max_depth =
(*std::max_element(h_cols_info.cbegin(),
h_cols_info.cend(),
[](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; }))
.nesting_depth;

auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, rmm::mr::get_current_device_resource());

auto const num_keys = _input_columns.size() * max_depth * pages.size();
// size iterator. indexes pages by sorted order
rmm::device_uvector<size_type> size_input{num_keys, _stream};
thrust::transform(
rmm::exec_policy(_stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(num_keys),
size_input.begin(),
get_page_nesting_size{
d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()});
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()});
hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input.cbegin(),
start_offset_output_iterator{
pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()});

sizes.device_to_host(_stream, true);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
auto const& input_col = _input_columns[idx];
auto* cols = &_output_buffers;
for (size_type l_idx = 0; l_idx < static_cast<size_type>(input_col.nesting_depth());
l_idx++) {
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
cols = &out_buf.children;

// size iterator. indexes pages by sorted order
auto size_input = thrust::make_transform_iterator(
page_index.begin(),
get_page_nesting_size{src_col_schema, static_cast<size_type>(l_idx), pages.device_ptr()});

// if this buffer is part of a list hierarchy, we need to determine it's
// final size and allocate it here.
//
// for struct columns, higher levels of the output columns are shared between input
// columns. so don't compute any given level more than once.
if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) {
int size =
thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size());
auto size = sizes[(idx * max_depth) + l_idx];

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, _stream, _mr);
}

// for nested hierarchies, compute per-page start offset
if (input_col.has_repetition) {
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
page_keys.begin(),
page_keys.end(),
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
static_cast<int>(src_col_schema),
static_cast<int>(l_idx)});
}
}
}
}
Expand Down

0 comments on commit f328b64

Please sign in to comment.