Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute column sizes in Parquet preprocess with single kernel #12931

Merged
merged 19 commits into from
Apr 7, 2023
Merged
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 118 additions & 51 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1215,24 +1215,60 @@ struct get_page_schema {
__device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; }
};

struct input_col_info {
int const schema_idx;
size_type const nesting_depth;
};

/**
* @brief Converts a 1-dimensional index into page, depth and column indices used in
* allocate_columns to compute columns sizes.
*
* The input index will iterate through pages, nesting depth and column indices in that order.
*/
struct reduction_indices {
size_t const page_idx;
size_type const depth_idx;
size_type const col_idx;

__device__ reduction_indices(size_t index_, size_type max_depth_, size_t num_pages_)
: page_idx(index_ % num_pages_),
depth_idx((index_ / num_pages_) % max_depth_),
col_idx(index_ / (max_depth_ * num_pages_))
{
}
};

/**
* @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema.
*/
struct get_page_nesting_size {
size_type const src_col_schema;
size_type const depth;
input_col_info const* const input_cols;
size_type const max_depth;
size_t const num_pages;
gpu::PageInfo const* const pages;
int const* page_indices;
vuule marked this conversation as resolved.
Show resolved Hide resolved

__device__ size_type operator()(int index) const
__device__ size_type operator()(size_t index) const
{
auto const& page = pages[index];
if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto const indices = reduction_indices{index, max_depth, num_pages};

auto const& page = pages[page_indices[indices.page_idx]];
if (page.src_col_schema != input_cols[indices.col_idx].schema_idx ||
page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return 0;
}
return page.nesting[depth].batch_size;

return page.nesting[indices.depth_idx].batch_size;
}
};

struct get_reduction_key {
size_t const num_pages;
__device__ size_t operator()(size_t index) const { return index / num_pages; }
};

/**
* @brief Writes to the chunk_row field of the PageInfo struct.
*/
Expand All @@ -1259,11 +1295,12 @@ struct chunk_row_output_iter {
* @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema.
*/
struct start_offset_output_iterator {
gpu::PageInfo* pages;
gpu::PageInfo const* pages;
int const* page_indices;
int cur_index;
int src_col_schema;
int nesting_depth;
size_t cur_index;
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
input_col_info const* input_cols;
size_type max_depth;
size_t num_pages;
int empty = 0;
using value_type = size_type;
using difference_type = size_type;
Expand All @@ -1273,32 +1310,37 @@ struct start_offset_output_iterator {

constexpr void operator=(start_offset_output_iterator const& other)
{
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
src_col_schema = other.src_col_schema;
nesting_depth = other.nesting_depth;
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
input_cols = other.input_cols;
max_depth = other.max_depth;
num_pages = other.num_pages;
}

constexpr start_offset_output_iterator operator+(int i)
constexpr start_offset_output_iterator operator+(size_t i)
{
return start_offset_output_iterator{
pages, page_indices, cur_index + i, src_col_schema, nesting_depth};
pages, page_indices, cur_index + i, input_cols, max_depth, num_pages};
}

constexpr void operator++() { cur_index++; }

__device__ reference operator[](int i) { return dereference(cur_index + i); }
__device__ reference operator[](size_t i) { return dereference(cur_index + i); }
__device__ reference operator*() { return dereference(cur_index); }

private:
__device__ reference dereference(int index)
__device__ reference dereference(size_t index)
{
gpu::PageInfo const& p = pages[page_indices[index]];
if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto const indices = reduction_indices{index, max_depth, num_pages};

gpu::PageInfo const& p = pages[page_indices[indices.page_idx]];
if (p.src_col_schema != input_cols[indices.col_idx].schema_idx ||
p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return empty;
}
return p.nesting_decode[nesting_depth].page_start_value;
return p.nesting_decode[indices.depth_idx].page_start_value;
}
};

Expand Down Expand Up @@ -1620,52 +1662,77 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses

// compute output column sizes by examining the pages of the -input- columns
if (has_lists) {
auto& page_keys = _chunk_itm_data.page_keys;
auto& page_index = _chunk_itm_data.page_index;
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
auto src_col_schema = input_col.schema_idx;
size_t max_depth = input_col.nesting_depth();

auto* cols = &_output_buffers;
for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) {
std::vector<input_col_info> h_cols_info;
h_cols_info.reserve(_input_columns.size());
std::transform(_input_columns.cbegin(),
_input_columns.cend(),
std::back_inserter(h_cols_info),
[](auto& col) -> input_col_info {
return {col.schema_idx, static_cast<size_type>(col.nesting_depth())};
});

auto const max_depth =
(*std::max_element(h_cols_info.cbegin(),
h_cols_info.cend(),
[](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; }))
.nesting_depth;

auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, rmm::mr::get_current_device_resource());

// size iterator. indexes pages by sorted order
auto const size_input = cudf::detail::make_counting_transform_iterator(
0,
get_page_nesting_size{
d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()});
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()});
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};
auto const num_keys = _input_columns.size() * max_depth * pages.size();

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
start_offset_output_iterator{
pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()});

sizes.device_to_host(_stream, true);
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
auto const& input_col = _input_columns[idx];
auto* cols = &_output_buffers;
for (size_type l_idx = 0; l_idx < static_cast<size_type>(input_col.nesting_depth());
l_idx++) {
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
cols = &out_buf.children;

// size iterator. indexes pages by sorted order
auto size_input = thrust::make_transform_iterator(
page_index.begin(),
get_page_nesting_size{src_col_schema, static_cast<size_type>(l_idx), pages.device_ptr()});

// if this buffer is part of a list hierarchy, we need to determine it's
// final size and allocate it here.
//
// for struct columns, higher levels of the output columns are shared between input
// columns. so don't compute any given level more than once.
if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) {
int size =
thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size());
auto size = sizes[(idx * max_depth) + l_idx];

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, _stream, _mr);
}

// for nested hierarchies, compute per-page start offset
if (input_col.has_repetition) {
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
page_keys.begin(),
page_keys.end(),
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
static_cast<int>(src_col_schema),
static_cast<int>(l_idx)});
}
}
}
}
Expand Down