Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute column sizes in Parquet preprocess with single kernel #12931

Merged
merged 19 commits into from
Apr 7, 2023
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 102 additions & 43 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1211,24 +1211,59 @@ struct get_page_schema {
__device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; }
};

struct input_col_info {
int schema_idx;
size_t nesting_depth;
};

/**
* @brief Converts a 1-dimensional index into page, depth and column indices used in
* allocate_columns to compute columns sizes. The input index will iterate through pages, nesting depth and
* column indices in that order.
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
*/
struct get_indices {
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
size_t page_idx;
size_t depth_idx;
size_t col_idx;
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

__device__ get_indices(int index, size_t max_depth, size_t num_pages)
: page_idx(index % num_pages),
depth_idx((index / num_pages) % max_depth),
col_idx(index / (max_depth * num_pages))
{
}
};

/**
* @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema.
*/
struct get_page_nesting_size {
size_type const src_col_schema;
size_type const depth;
input_col_info* input_cols;
size_t max_depth;
size_t num_pages;
gpu::PageInfo const* const pages;
int const* page_indices;
vuule marked this conversation as resolved.
Show resolved Hide resolved

__device__ size_type operator()(int index) const
{
auto const& page = pages[index];
if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto indices = get_indices{index, max_depth, num_pages};

auto const& page = pages[page_indices[indices.page_idx]];
if (page.src_col_schema != input_cols[indices.col_idx].schema_idx ||
page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return 0;
}
return page.nesting[depth].batch_size;

return page.nesting[indices.depth_idx].batch_size;
}
};

struct get_reduction_key {
size_t num_pages;
__device__ size_type operator()(int index) { return index / num_pages; }
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
};

/**
* @brief Writes to the chunk_row field of the PageInfo struct.
*/
Expand Down Expand Up @@ -1258,8 +1293,9 @@ struct start_offset_output_iterator {
gpu::PageInfo* pages;
int const* page_indices;
int cur_index;
int src_col_schema;
int nesting_depth;
input_col_info* input_cols;
size_t max_depth;
size_t num_pages;
int empty = 0;
using value_type = size_type;
using difference_type = size_type;
Expand All @@ -1269,17 +1305,18 @@ struct start_offset_output_iterator {

constexpr void operator=(start_offset_output_iterator const& other)
{
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
src_col_schema = other.src_col_schema;
nesting_depth = other.nesting_depth;
pages = other.pages;
page_indices = other.page_indices;
cur_index = other.cur_index;
input_cols = other.input_cols;
max_depth = other.max_depth;
num_pages = other.num_pages;
}

constexpr start_offset_output_iterator operator+(int i)
{
return start_offset_output_iterator{
pages, page_indices, cur_index + i, src_col_schema, nesting_depth};
pages, page_indices, cur_index + i, input_cols, max_depth, num_pages};
}

constexpr void operator++() { cur_index++; }
Expand All @@ -1290,11 +1327,15 @@ struct start_offset_output_iterator {
private:
__device__ reference dereference(int index)
{
gpu::PageInfo const& p = pages[page_indices[index]];
if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) {
auto indices = get_indices{index, max_depth, num_pages};
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

gpu::PageInfo const& p = pages[page_indices[indices.page_idx]];
if (p.src_col_schema != input_cols[indices.col_idx].schema_idx ||
p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
return empty;
}
return p.nesting_decode[nesting_depth].page_start_value;
return p.nesting_decode[indices.depth_idx].page_start_value;
}
};

Expand Down Expand Up @@ -1616,52 +1657,70 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses

// compute output column sizes by examining the pages of the -input- columns
if (has_lists) {
auto& page_keys = _chunk_itm_data.page_keys;
auto& page_index = _chunk_itm_data.page_index;

hostdevice_vector<input_col_info> input_cols{_input_columns.size(), _stream};
size_t max_depth = 0;
for (size_t i = 0; i < _input_columns.size(); i++) {
auto depth = _input_columns[i].nesting_depth();
max_depth = depth > max_depth ? depth : max_depth;
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
input_cols[i].nesting_depth = depth;
input_cols[i].schema_idx = _input_columns[i].schema_idx;
}
input_cols.host_to_device(_stream);
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

// size iterator. indexes pages by sorted order
auto size_input = cudf::detail::make_counting_transform_iterator(
0,
get_page_nesting_size{
input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()});

auto reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()});
hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};
auto num_keys = _input_columns.size() * max_depth * pages.size();
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
thrust::make_discard_iterator(),
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
input_cols.device_ptr(),
max_depth,
pages.size()});
Copy link
Contributor

@ttnghia ttnghia Mar 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can even do better by combining these 2 kernel calls? Since they are operating on the same reduction_keys. Maybe just one reduce_by_key with a custom device lambda/functor that can do both reduce and scan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure how to create a functor for reduce_by_key that would also perform the scan but it could be possible. Do you have an idea in mind?


sizes.device_to_host(_stream, true);
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
auto src_col_schema = input_col.schema_idx;
size_t max_depth = input_col.nesting_depth();

auto* cols = &_output_buffers;
auto* cols = &_output_buffers;
for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) {
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
cols = &out_buf.children;

// size iterator. indexes pages by sorted order
auto size_input = thrust::make_transform_iterator(
page_index.begin(),
get_page_nesting_size{src_col_schema, static_cast<size_type>(l_idx), pages.device_ptr()});

// if this buffer is part of a list hierarchy, we need to determine it's
// final size and allocate it here.
//
// for struct columns, higher levels of the output columns are shared between input
// columns. so don't compute any given level more than once.
if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) {
int size =
thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size());
int size = sizes[(idx * max_depth) + l_idx];
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, _stream, _mr);
}

// for nested hierarchies, compute per-page start offset
if (input_col.has_repetition) {
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
page_keys.begin(),
page_keys.end(),
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
static_cast<int>(src_col_schema),
static_cast<int>(l_idx)});
}
}
}
}
Expand Down