Skip to content

Commit

Permalink
Reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
SrikarVanavasam committed Mar 28, 2023
1 parent ebb2c22 commit 7448aa5
Showing 1 changed file with 66 additions and 61 deletions.
127 changes: 66 additions & 61 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1212,24 +1212,25 @@ struct get_page_schema {
};

struct input_col_info {
int schema_idx;
size_t nesting_depth;
int const schema_idx;
size_type const nesting_depth;
};

/**
* @brief Converts a 1-dimensional index into page, depth and column indices used in
* allocate_columns to compute columns sizes. The input index will iterate through pages, nesting depth and
* column indices in that order.
* allocate_columns to compute columns sizes.
*
* The input index will iterate through pages, nesting depth and column indices in that order.
*/
struct get_indices {
size_t page_idx;
size_t depth_idx;
size_t col_idx;

__device__ get_indices(int index, size_t max_depth, size_t num_pages)
: page_idx(index % num_pages),
depth_idx((index / num_pages) % max_depth),
col_idx(index / (max_depth * num_pages))
struct reduction_indices {
const size_t _page_idx;
const size_type _depth_idx;
const size_type _col_idx;

__device__ reduction_indices(size_t index, size_type max_depth, size_t num_pages)
: _page_idx(index % num_pages),
_depth_idx((index / num_pages) % max_depth),
_col_idx(index / (max_depth * num_pages))
{
}
};
Expand All @@ -1238,30 +1239,30 @@ struct get_indices {
* @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema.
*/
struct get_page_nesting_size {
input_col_info* input_cols;
size_t max_depth;
size_t num_pages;
input_col_info const* input_cols;
const size_type max_depth;
const size_t num_pages;
gpu::PageInfo const* const pages;
int const* page_indices;

__device__ size_type operator()(int index) const
__device__ size_type operator()(size_t index) const
{
auto indices = get_indices{index, max_depth, num_pages};
auto const indices = reduction_indices{index, max_depth, num_pages};

auto const& page = pages[page_indices[indices.page_idx]];
if (page.src_col_schema != input_cols[indices.col_idx].schema_idx ||
auto const& page = pages[page_indices[indices._page_idx]];
if (page.src_col_schema != input_cols[indices._col_idx].schema_idx ||
page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) {
return 0;
}

return page.nesting[indices.depth_idx].batch_size;
return page.nesting[indices._depth_idx].batch_size;
}
};

struct get_reduction_key {
size_t num_pages;
__device__ size_type operator()(int index) { return index / num_pages; }
const size_t num_pages;
__device__ size_t operator()(size_t index) const { return index / num_pages; }
};

/**
Expand Down Expand Up @@ -1292,9 +1293,9 @@ struct chunk_row_output_iter {
struct start_offset_output_iterator {
gpu::PageInfo* pages;
int const* page_indices;
int cur_index;
input_col_info* input_cols;
size_t max_depth;
size_t cur_index;
input_col_info const* input_cols;
size_type max_depth;
size_t num_pages;
int empty = 0;
using value_type = size_type;
Expand All @@ -1313,29 +1314,29 @@ struct start_offset_output_iterator {
num_pages = other.num_pages;
}

constexpr start_offset_output_iterator operator+(int i)
constexpr start_offset_output_iterator operator+(size_t i)
{
return start_offset_output_iterator{
pages, page_indices, cur_index + i, input_cols, max_depth, num_pages};
}

constexpr void operator++() { cur_index++; }

__device__ reference operator[](int i) { return dereference(cur_index + i); }
__device__ reference operator[](size_t i) { return dereference(cur_index + i); }
__device__ reference operator*() { return dereference(cur_index); }

private:
__device__ reference dereference(int index)
__device__ reference dereference(size_t index)
{
auto indices = get_indices{index, max_depth, num_pages};
auto const indices = reduction_indices{index, max_depth, num_pages};

gpu::PageInfo const& p = pages[page_indices[indices.page_idx]];
if (p.src_col_schema != input_cols[indices.col_idx].schema_idx ||
gpu::PageInfo const& p = pages[page_indices[indices._page_idx]];
if (p.src_col_schema != input_cols[indices._col_idx].schema_idx ||
p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY ||
indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) {
indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) {
return empty;
}
return p.nesting_decode[indices.depth_idx].page_start_value;
return p.nesting_decode[indices._depth_idx].page_start_value;
}
};

Expand Down Expand Up @@ -1659,26 +1660,32 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
if (has_lists) {
auto& page_index = _chunk_itm_data.page_index;

hostdevice_vector<input_col_info> input_cols{_input_columns.size(), _stream};
size_t max_depth = 0;
for (size_t i = 0; i < _input_columns.size(); i++) {
auto depth = _input_columns[i].nesting_depth();
max_depth = depth > max_depth ? depth : max_depth;
input_cols[i].nesting_depth = depth;
input_cols[i].schema_idx = _input_columns[i].schema_idx;
}
input_cols.host_to_device(_stream);
std::vector<input_col_info> h_cols_info;
h_cols_info.reserve(_input_columns.size());
std::transform(_input_columns.cbegin(),
_input_columns.cend(),
std::back_inserter(h_cols_info),
[](auto& col) -> input_col_info {
return {static_cast<size_type>(col.nesting_depth()), col.schema_idx};
});
auto const max_depth =
(*std::max_element(h_cols_info.cbegin(),
h_cols_info.cend(),
[](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; }))
.nesting_depth;
auto const d_cols_info = cudf::detail::make_device_uvector_async(
h_cols_info, _stream, rmm::mr::get_current_device_resource());

// size iterator. indexes pages by sorted order
auto size_input = cudf::detail::make_counting_transform_iterator(
auto const size_input = cudf::detail::make_counting_transform_iterator(
0,
get_page_nesting_size{
input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()});
d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()});

auto reduction_keys =
auto const reduction_keys =
cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()});
hostdevice_vector<size_t> sizes{_input_columns.size() * max_depth, _stream};
auto num_keys = _input_columns.size() * max_depth * pages.size();
auto const num_keys = _input_columns.size() * max_depth * pages.size();

// find the size of each column
thrust::reduce_by_key(rmm::exec_policy(_stream),
Expand All @@ -1689,22 +1696,20 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
sizes.d_begin());

// for nested hierarchies, compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
input_cols.device_ptr(),
max_depth,
pages.size()});
thrust::exclusive_scan_by_key(
rmm::exec_policy(_stream),
reduction_keys,
reduction_keys + num_keys,
size_input,
start_offset_output_iterator{
pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()});

sizes.device_to_host(_stream, true);
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
for (size_type idx = 0; idx < static_cast<size_type>(_input_columns.size()); idx++) {
auto const& input_col = _input_columns[idx];
auto* cols = &_output_buffers;
for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) {
for (size_type l_idx = 0; l_idx < static_cast<size_type>(input_col.nesting_depth());
l_idx++) {
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
cols = &out_buf.children;
// if this buffer is part of a list hierarchy, we need to determine it's
Expand All @@ -1713,7 +1718,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
// for struct columns, higher levels of the output columns are shared between input
// columns. so don't compute any given level more than once.
if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) {
int size = sizes[(idx * max_depth) + l_idx];
auto size = sizes[(idx * max_depth) + l_idx];

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }
Expand Down

0 comments on commit 7448aa5

Please sign in to comment.