diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index e5c2b7aa842..72641adb402 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1215,24 +1215,60 @@ struct get_page_schema { __device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; } }; +struct input_col_info { + int const schema_idx; + size_type const nesting_depth; +}; + +/** + * @brief Converts a 1-dimensional index into page, depth and column indices used in + * allocate_columns to compute columns sizes. + * + * The input index will iterate through pages, nesting depth and column indices in that order. + */ +struct reduction_indices { + size_t const page_idx; + size_type const depth_idx; + size_type const col_idx; + + __device__ reduction_indices(size_t index_, size_type max_depth_, size_t num_pages_) + : page_idx(index_ % num_pages_), + depth_idx((index_ / num_pages_) % max_depth_), + col_idx(index_ / (max_depth_ * num_pages_)) + { + } +}; + /** * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - size_type const src_col_schema; - size_type const depth; + input_col_info const* const input_cols; + size_type const max_depth; + size_t const num_pages; gpu::PageInfo const* const pages; + int const* page_indices; - __device__ size_type operator()(int index) const + __device__ size_type operator()(size_t index) const { - auto const& page = pages[index]; - if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + auto const indices = reduction_indices{index, max_depth, num_pages}; + + auto const& page = pages[page_indices[indices.page_idx]]; + if (page.src_col_schema != input_cols[indices.col_idx].schema_idx || + page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return 0; } - return page.nesting[depth].batch_size; + + return page.nesting[indices.depth_idx].batch_size; } }; +struct get_reduction_key { + size_t const num_pages; + __device__ size_t operator()(size_t index) const { return index / num_pages; } +}; + /** * @brief Writes to the chunk_row field of the PageInfo struct. */ @@ -1259,11 +1295,12 @@ struct chunk_row_output_iter { * @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema. */ struct start_offset_output_iterator { - gpu::PageInfo* pages; + gpu::PageInfo const* pages; int const* page_indices; - int cur_index; - int src_col_schema; - int nesting_depth; + size_t cur_index; + input_col_info const* input_cols; + size_type max_depth; + size_t num_pages; int empty = 0; using value_type = size_type; using difference_type = size_type; @@ -1273,32 +1310,37 @@ struct start_offset_output_iterator { constexpr void operator=(start_offset_output_iterator const& other) { - pages = other.pages; - page_indices = other.page_indices; - cur_index = other.cur_index; - src_col_schema = other.src_col_schema; - nesting_depth = other.nesting_depth; + pages = other.pages; + page_indices = other.page_indices; + cur_index = other.cur_index; + input_cols = other.input_cols; + max_depth = other.max_depth; + num_pages = other.num_pages; } - constexpr start_offset_output_iterator operator+(int i) + constexpr start_offset_output_iterator operator+(size_t i) { return start_offset_output_iterator{ - pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; + pages, page_indices, cur_index + i, input_cols, max_depth, num_pages}; } constexpr void operator++() { cur_index++; } - __device__ reference operator[](int i) { return dereference(cur_index + i); } + __device__ reference operator[](size_t i) { return dereference(cur_index + i); } __device__ reference operator*() { return dereference(cur_index); } private: - __device__ reference dereference(int index) + __device__ reference dereference(size_t index) { - gpu::PageInfo const& p = pages[page_indices[index]]; - if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + auto const indices = reduction_indices{index, max_depth, num_pages}; + + gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; + if (p.src_col_schema != input_cols[indices.col_idx].schema_idx || + p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return empty; } - return p.nesting_decode[nesting_depth].page_start_value; + return p.nesting_decode[indices.depth_idx].page_start_value; } }; @@ -1620,31 +1662,72 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - auto& page_keys = _chunk_itm_data.page_keys; auto& page_index = _chunk_itm_data.page_index; - for (size_t idx = 0; idx < _input_columns.size(); idx++) { - auto const& input_col = _input_columns[idx]; - auto src_col_schema = input_col.schema_idx; - size_t max_depth = input_col.nesting_depth(); - auto* cols = &_output_buffers; - for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + std::vector h_cols_info; + h_cols_info.reserve(_input_columns.size()); + std::transform(_input_columns.cbegin(), + _input_columns.cend(), + std::back_inserter(h_cols_info), + [](auto& col) -> input_col_info { + return {col.schema_idx, static_cast(col.nesting_depth())}; + }); + + auto const max_depth = + (*std::max_element(h_cols_info.cbegin(), + h_cols_info.cend(), + [](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; })) + .nesting_depth; + + auto const d_cols_info = cudf::detail::make_device_uvector_async( + h_cols_info, _stream, rmm::mr::get_current_device_resource()); + + auto const num_keys = _input_columns.size() * max_depth * pages.size(); + // size iterator. indexes pages by sorted order + rmm::device_uvector size_input{num_keys, _stream}; + thrust::transform( + rmm::exec_policy(_stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_keys), + size_input.begin(), + get_page_nesting_size{ + d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + auto const reduction_keys = + cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); + hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; + + // find the size of each column + thrust::reduce_by_key(rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input.cbegin(), + thrust::make_discard_iterator(), + sizes.d_begin()); + + // for nested hierarchies, compute per-page start offset + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input.cbegin(), + start_offset_output_iterator{ + pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()}); + + sizes.device_to_host(_stream, true); + for (size_type idx = 0; idx < static_cast(_input_columns.size()); idx++) { + auto const& input_col = _input_columns[idx]; + auto* cols = &_output_buffers; + for (size_type l_idx = 0; l_idx < static_cast(input_col.nesting_depth()); + l_idx++) { auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; - - // size iterator. indexes pages by sorted order - auto size_input = thrust::make_transform_iterator( - page_index.begin(), - get_page_nesting_size{src_col_schema, static_cast(l_idx), pages.device_ptr()}); - // if this buffer is part of a list hierarchy, we need to determine it's // final size and allocate it here. // // for struct columns, higher levels of the output columns are shared between input // columns. so don't compute any given level more than once. if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = - thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size()); + auto size = sizes[(idx * max_depth) + l_idx]; // if this is a list column add 1 for non-leaf levels for the terminating offset if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } @@ -1652,20 +1735,6 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // allocate out_buf.create(size, _stream, _mr); } - - // for nested hierarchies, compute per-page start offset - if (input_col.has_repetition) { - thrust::exclusive_scan_by_key( - rmm::exec_policy(_stream), - page_keys.begin(), - page_keys.end(), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - static_cast(src_col_schema), - static_cast(l_idx)}); - } } } }