From 7448aa5dcd94c3375da185cc2b676dd3973629e5 Mon Sep 17 00:00:00 2001 From: srikarvanavasam Date: Mon, 27 Mar 2023 17:55:54 -0700 Subject: [PATCH] Reviewer comments --- cpp/src/io/parquet/reader_impl_preprocess.cu | 127 ++++++++++--------- 1 file changed, 66 insertions(+), 61 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 6bf729fc88b..5f09ec33811 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1212,24 +1212,25 @@ struct get_page_schema { }; struct input_col_info { - int schema_idx; - size_t nesting_depth; + int const schema_idx; + size_type const nesting_depth; }; /** * @brief Converts a 1-dimensional index into page, depth and column indices used in - * allocate_columns to compute columns sizes. The input index will iterate through pages, nesting depth and - * column indices in that order. + * allocate_columns to compute columns sizes. + * + * The input index will iterate through pages, nesting depth and column indices in that order. */ -struct get_indices { - size_t page_idx; - size_t depth_idx; - size_t col_idx; - - __device__ get_indices(int index, size_t max_depth, size_t num_pages) - : page_idx(index % num_pages), - depth_idx((index / num_pages) % max_depth), - col_idx(index / (max_depth * num_pages)) +struct reduction_indices { + const size_t _page_idx; + const size_type _depth_idx; + const size_type _col_idx; + + __device__ reduction_indices(size_t index, size_type max_depth, size_t num_pages) + : _page_idx(index % num_pages), + _depth_idx((index / num_pages) % max_depth), + _col_idx(index / (max_depth * num_pages)) { } }; @@ -1238,30 +1239,30 @@ struct get_indices { * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - input_col_info* input_cols; - size_t max_depth; - size_t num_pages; + input_col_info const* input_cols; + const size_type max_depth; + const size_t num_pages; gpu::PageInfo const* const pages; int const* page_indices; - __device__ size_type operator()(int index) const + __device__ size_type operator()(size_t index) const { - auto indices = get_indices{index, max_depth, num_pages}; + auto const indices = reduction_indices{index, max_depth, num_pages}; - auto const& page = pages[page_indices[indices.page_idx]]; - if (page.src_col_schema != input_cols[indices.col_idx].schema_idx || + auto const& page = pages[page_indices[indices._page_idx]]; + if (page.src_col_schema != input_cols[indices._col_idx].schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { + indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { return 0; } - return page.nesting[indices.depth_idx].batch_size; + return page.nesting[indices._depth_idx].batch_size; } }; struct get_reduction_key { - size_t num_pages; - __device__ size_type operator()(int index) { return index / num_pages; } + const size_t num_pages; + __device__ size_t operator()(size_t index) const { return index / num_pages; } }; /** @@ -1292,9 +1293,9 @@ struct chunk_row_output_iter { struct start_offset_output_iterator { gpu::PageInfo* pages; int const* page_indices; - int cur_index; - input_col_info* input_cols; - size_t max_depth; + size_t cur_index; + input_col_info const* input_cols; + size_type max_depth; size_t num_pages; int empty = 0; using value_type = size_type; @@ -1313,7 +1314,7 @@ struct start_offset_output_iterator { num_pages = other.num_pages; } - constexpr start_offset_output_iterator operator+(int i) + constexpr start_offset_output_iterator operator+(size_t i) { return start_offset_output_iterator{ pages, page_indices, cur_index + i, input_cols, max_depth, num_pages}; @@ -1321,21 +1322,21 @@ struct start_offset_output_iterator { constexpr void operator++() { cur_index++; } - __device__ reference operator[](int i) { return dereference(cur_index + i); } + __device__ reference operator[](size_t i) { return dereference(cur_index + i); } __device__ reference operator*() { return dereference(cur_index); } private: - __device__ reference dereference(int index) + __device__ reference dereference(size_t index) { - auto indices = get_indices{index, max_depth, num_pages}; + auto const indices = reduction_indices{index, max_depth, num_pages}; - gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; - if (p.src_col_schema != input_cols[indices.col_idx].schema_idx || + gpu::PageInfo const& p = pages[page_indices[indices._page_idx]]; + if (p.src_col_schema != input_cols[indices._col_idx].schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { + indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { return empty; } - return p.nesting_decode[indices.depth_idx].page_start_value; + return p.nesting_decode[indices._depth_idx].page_start_value; } }; @@ -1659,26 +1660,32 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses if (has_lists) { auto& page_index = _chunk_itm_data.page_index; - hostdevice_vector input_cols{_input_columns.size(), _stream}; - size_t max_depth = 0; - for (size_t i = 0; i < _input_columns.size(); i++) { - auto depth = _input_columns[i].nesting_depth(); - max_depth = depth > max_depth ? depth : max_depth; - input_cols[i].nesting_depth = depth; - input_cols[i].schema_idx = _input_columns[i].schema_idx; - } - input_cols.host_to_device(_stream); + std::vector h_cols_info; + h_cols_info.reserve(_input_columns.size()); + std::transform(_input_columns.cbegin(), + _input_columns.cend(), + std::back_inserter(h_cols_info), + [](auto& col) -> input_col_info { + return {static_cast(col.nesting_depth()), col.schema_idx}; + }); + auto const max_depth = + (*std::max_element(h_cols_info.cbegin(), + h_cols_info.cend(), + [](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; })) + .nesting_depth; + auto const d_cols_info = cudf::detail::make_device_uvector_async( + h_cols_info, _stream, rmm::mr::get_current_device_resource()); // size iterator. indexes pages by sorted order - auto size_input = cudf::detail::make_counting_transform_iterator( + auto const size_input = cudf::detail::make_counting_transform_iterator( 0, get_page_nesting_size{ - input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); - auto reduction_keys = + auto const reduction_keys = cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; - auto num_keys = _input_columns.size() * max_depth * pages.size(); + auto const num_keys = _input_columns.size() * max_depth * pages.size(); // find the size of each column thrust::reduce_by_key(rmm::exec_policy(_stream), @@ -1689,22 +1696,20 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses sizes.d_begin()); // for nested hierarchies, compute per-page start offset - thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + num_keys, - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - input_cols.device_ptr(), - max_depth, - pages.size()}); + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input, + start_offset_output_iterator{ + pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()}); sizes.device_to_host(_stream, true); - for (size_t idx = 0; idx < _input_columns.size(); idx++) { + for (size_type idx = 0; idx < static_cast(_input_columns.size()); idx++) { auto const& input_col = _input_columns[idx]; auto* cols = &_output_buffers; - for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + for (size_type l_idx = 0; l_idx < static_cast(input_col.nesting_depth()); + l_idx++) { auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; // if this buffer is part of a list hierarchy, we need to determine it's @@ -1713,7 +1718,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // for struct columns, higher levels of the output columns are shared between input // columns. so don't compute any given level more than once. if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = sizes[(idx * max_depth) + l_idx]; + auto size = sizes[(idx * max_depth) + l_idx]; // if this is a list column add 1 for non-leaf levels for the terminating offset if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }