From c863d77291c571979799eef09584aaaf4b768d65 Mon Sep 17 00:00:00 2001 From: srikarvanavasam Date: Thu, 16 Feb 2023 20:05:28 -0800 Subject: [PATCH 01/12] nested column size calculation into one kernel --- cpp/src/io/parquet/reader_impl_preprocess.cu | 117 ++++++++++++------- 1 file changed, 77 insertions(+), 40 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 6577a1a3f0f..6b79c59bf50 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1186,20 +1186,33 @@ struct get_page_schema { * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - size_type const src_col_schema; - size_type const depth; + int* input_cols_schema; + size_t* input_cols_depth; + size_t max_depth; + size_t num_pages; gpu::PageInfo const* const pages; + int const* page_indices; __device__ size_type operator()(int index) const { - auto const& page = pages[index]; - if (page.src_col_schema != src_col_schema || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + auto page_idx = index % num_pages; + auto depth_idx = (index / num_pages) % max_depth; + auto col_idx = index / (max_depth * num_pages); + + auto const& page = pages[page_indices[page_idx]]; + if (page.src_col_schema != input_cols_schema[col_idx] || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols_depth[col_idx]) { return 0; } - return page.nesting[depth].batch_size; + + return page.nesting[depth_idx].batch_size; } }; +struct get_reduction_key { + size_t num_pages; + __device__ size_type operator()(int index) { return index / num_pages; } +}; + /** * @brief Writes to the chunk_row field of the PageInfo struct. */ @@ -1229,8 +1242,10 @@ struct start_offset_output_iterator { gpu::PageInfo* pages; int const* page_indices; int cur_index; - int src_col_schema; - int nesting_depth; + int* input_cols_schema; + size_t* input_cols_depth; + size_t max_depth; + size_t num_pages; int empty = 0; using value_type = size_type; using difference_type = size_type; @@ -1243,14 +1258,16 @@ struct start_offset_output_iterator { pages = other.pages; page_indices = other.page_indices; cur_index = other.cur_index; - src_col_schema = other.src_col_schema; - nesting_depth = other.nesting_depth; + input_cols_schema = other.input_cols_schema; + input_cols_depth = other.input_cols_depth; + max_depth = other.max_depth; + num_pages = other.num_pages; } constexpr start_offset_output_iterator operator+(int i) { return start_offset_output_iterator{ - pages, page_indices, cur_index + i, src_col_schema, nesting_depth}; + pages, page_indices, cur_index + i, input_cols_schema, input_cols_depth, max_depth, num_pages}; } constexpr void operator++() { cur_index++; } @@ -1261,11 +1278,15 @@ struct start_offset_output_iterator { private: __device__ reference dereference(int index) { - gpu::PageInfo const& p = pages[page_indices[index]]; - if (p.src_col_schema != src_col_schema || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY) { + auto page_idx = index % num_pages; + auto depth_idx = (index / num_pages) % max_depth; + auto col_idx = index / (max_depth * num_pages); + + gpu::PageInfo const& p = pages[page_indices[page_idx]]; + if (p.src_col_schema != input_cols_schema[col_idx] || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols_depth[col_idx]) { return empty; } - return p.nesting_decode[nesting_depth].page_start_value; + return p.nesting_decode[depth_idx].page_start_value; } }; @@ -1587,52 +1608,68 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - auto& page_keys = _chunk_itm_data.page_keys; auto& page_index = _chunk_itm_data.page_index; + thrust::device_vector input_cols_schema(_input_columns.size()); + thrust::device_vector input_cols_depth(_input_columns.size()); + for (size_t i = 0; i < _input_columns.size(); i++) + { + input_cols_schema[i] = _input_columns[i].schema_idx; + input_cols_depth[i] = _input_columns[i].nesting_depth(); + } + auto max_depth = *std::max_element(input_cols_depth.begin(), input_cols_depth.end()); + // size iterator. indexes pages by sorted order + auto size_input = cudf::detail::make_counting_transform_iterator( + 0, + get_page_nesting_size{input_cols_schema.data().get(), input_cols_depth.data().get(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + + auto reduction_keys = cudf::detail::make_counting_transform_iterator( + 0, + get_reduction_key{pages.size()}); + thrust::device_vector sizes(_input_columns.size() * max_depth); + // find the size of each column + thrust::reduce_by_key( + rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + (_input_columns.size() * max_depth * pages.size()), + size_input, + thrust::make_discard_iterator(), + sizes.begin() + ); + + // for nested hierarchies, compute per-page start offset + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + (_input_columns.size() * max_depth * pages.size()), + size_input, + start_offset_output_iterator{pages.device_ptr(), + page_index.begin(), + 0, + input_cols_schema.data().get(), + input_cols_depth.data().get(), + max_depth, + pages.size()}); + for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& input_col = _input_columns[idx]; - auto src_col_schema = input_col.schema_idx; - size_t max_depth = input_col.nesting_depth(); - auto* cols = &_output_buffers; for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; - - // size iterator. indexes pages by sorted order - auto size_input = thrust::make_transform_iterator( - page_index.begin(), - get_page_nesting_size{src_col_schema, static_cast(l_idx), pages.device_ptr()}); - // if this buffer is part of a list hierarchy, we need to determine it's // final size and allocate it here. // // for struct columns, higher levels of the output columns are shared between input // columns. so don't compute any given level more than once. if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = - thrust::reduce(rmm::exec_policy(_stream), size_input, size_input + pages.size()); + int size = sizes[(idx * max_depth) + l_idx]; // if this is a list column add 1 for non-leaf levels for the terminating offset if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } // allocate out_buf.create(size, _stream, _mr); - } - - // for nested hierarchies, compute per-page start offset - if (input_col.has_repetition) { - thrust::exclusive_scan_by_key( - rmm::exec_policy(_stream), - page_keys.begin(), - page_keys.end(), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - static_cast(src_col_schema), - static_cast(l_idx)}); - } + } } } } From 222f031b85b669e79ef928a4efca185feca8eab6 Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Fri, 24 Feb 2023 13:00:54 -0800 Subject: [PATCH 02/12] use hostdevice_vector --- cpp/src/io/parquet/reader_impl_preprocess.cu | 47 +++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 6b79c59bf50..dd4c8c73e9d 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1182,12 +1182,16 @@ struct get_page_schema { __device__ size_type operator()(gpu::PageInfo const& page) { return page.src_col_schema; } }; +struct input_col_info { + int schema_idx; + size_t nesting_depth; +}; + /** * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - int* input_cols_schema; - size_t* input_cols_depth; + input_col_info* input_cols; size_t max_depth; size_t num_pages; gpu::PageInfo const* const pages; @@ -1200,10 +1204,10 @@ struct get_page_nesting_size { auto col_idx = index / (max_depth * num_pages); auto const& page = pages[page_indices[page_idx]]; - if (page.src_col_schema != input_cols_schema[col_idx] || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols_depth[col_idx]) { + if (page.src_col_schema != input_cols[col_idx].schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols[col_idx].nesting_depth) { return 0; } - + return page.nesting[depth_idx].batch_size; } }; @@ -1242,8 +1246,7 @@ struct start_offset_output_iterator { gpu::PageInfo* pages; int const* page_indices; int cur_index; - int* input_cols_schema; - size_t* input_cols_depth; + input_col_info* input_cols; size_t max_depth; size_t num_pages; int empty = 0; @@ -1258,8 +1261,7 @@ struct start_offset_output_iterator { pages = other.pages; page_indices = other.page_indices; cur_index = other.cur_index; - input_cols_schema = other.input_cols_schema; - input_cols_depth = other.input_cols_depth; + input_cols = other.input_cols; max_depth = other.max_depth; num_pages = other.num_pages; } @@ -1267,7 +1269,7 @@ struct start_offset_output_iterator { constexpr start_offset_output_iterator operator+(int i) { return start_offset_output_iterator{ - pages, page_indices, cur_index + i, input_cols_schema, input_cols_depth, max_depth, num_pages}; + pages, page_indices, cur_index + i, input_cols, max_depth, num_pages}; } constexpr void operator++() { cur_index++; } @@ -1283,7 +1285,7 @@ struct start_offset_output_iterator { auto col_idx = index / (max_depth * num_pages); gpu::PageInfo const& p = pages[page_indices[page_idx]]; - if (p.src_col_schema != input_cols_schema[col_idx] || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols_depth[col_idx]) { + if (p.src_col_schema != input_cols[col_idx].schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols[col_idx].nesting_depth) { return empty; } return p.nesting_decode[depth_idx].page_start_value; @@ -1609,23 +1611,26 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { auto& page_index = _chunk_itm_data.page_index; - thrust::device_vector input_cols_schema(_input_columns.size()); - thrust::device_vector input_cols_depth(_input_columns.size()); + + hostdevice_vector input_cols{_input_columns.size(), _stream}; for (size_t i = 0; i < _input_columns.size(); i++) { - input_cols_schema[i] = _input_columns[i].schema_idx; - input_cols_depth[i] = _input_columns[i].nesting_depth(); + input_cols[i].schema_idx = _input_columns[i].schema_idx; + input_cols[i].nesting_depth = _input_columns[i].nesting_depth(); } - auto max_depth = *std::max_element(input_cols_depth.begin(), input_cols_depth.end()); + auto max_depth = std::max_element(input_cols.begin(), input_cols.end(), [](auto a, auto b){return a.nesting_depth < b.nesting_depth;})->nesting_depth; + input_cols.host_to_device(_stream); + // size iterator. indexes pages by sorted order auto size_input = cudf::detail::make_counting_transform_iterator( 0, - get_page_nesting_size{input_cols_schema.data().get(), input_cols_depth.data().get(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + get_page_nesting_size{input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); auto reduction_keys = cudf::detail::make_counting_transform_iterator( 0, get_reduction_key{pages.size()}); - thrust::device_vector sizes(_input_columns.size() * max_depth); + hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; + // find the size of each column thrust::reduce_by_key( rmm::exec_policy(_stream), @@ -1633,7 +1638,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses reduction_keys + (_input_columns.size() * max_depth * pages.size()), size_input, thrust::make_discard_iterator(), - sizes.begin() + sizes.d_begin() ); // for nested hierarchies, compute per-page start offset @@ -1645,11 +1650,11 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses start_offset_output_iterator{pages.device_ptr(), page_index.begin(), 0, - input_cols_schema.data().get(), - input_cols_depth.data().get(), + input_cols.device_ptr(), max_depth, - pages.size()}); + pages.size()}); + sizes.device_to_host(_stream, true); for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& input_col = _input_columns[idx]; auto* cols = &_output_buffers; From 17cba01832b2f79fba228364076217bfa3c298e2 Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Mon, 13 Mar 2023 12:55:04 -0700 Subject: [PATCH 03/12] added get_indeces --- cpp/src/io/parquet/reader_impl_preprocess.cu | 117 +++++++++++-------- 1 file changed, 67 insertions(+), 50 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index dd4c8c73e9d..2622648dcda 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1187,6 +1187,24 @@ struct input_col_info { size_t nesting_depth; }; +/** + * @brief Converts a 1-dimensional index into page, depth and column indices used in + * allocate_columns to compute columns sizes. The input index will through pages, nesting depth and + * columns in that order. + */ +struct get_indices { + size_t page_idx; + size_t depth_idx; + size_t col_idx; + + __device__ get_indices(int index, size_t max_depth, size_t num_pages) + : page_idx(index % num_pages), + depth_idx((index / num_pages) % max_depth), + col_idx(index / (max_depth * num_pages)) + { + } +}; + /** * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ @@ -1199,16 +1217,16 @@ struct get_page_nesting_size { __device__ size_type operator()(int index) const { - auto page_idx = index % num_pages; - auto depth_idx = (index / num_pages) % max_depth; - auto col_idx = index / (max_depth * num_pages); + auto indices = get_indices{index, max_depth, num_pages}; - auto const& page = pages[page_indices[page_idx]]; - if (page.src_col_schema != input_cols[col_idx].schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols[col_idx].nesting_depth) { + auto const& page = pages[page_indices[indices.page_idx]]; + if (page.src_col_schema != input_cols[indices.col_idx].schema_idx || + page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return 0; } - return page.nesting[depth_idx].batch_size; + return page.nesting[indices.depth_idx].batch_size; } }; @@ -1258,12 +1276,12 @@ struct start_offset_output_iterator { constexpr void operator=(start_offset_output_iterator const& other) { - pages = other.pages; - page_indices = other.page_indices; - cur_index = other.cur_index; - input_cols = other.input_cols; - max_depth = other.max_depth; - num_pages = other.num_pages; + pages = other.pages; + page_indices = other.page_indices; + cur_index = other.cur_index; + input_cols = other.input_cols; + max_depth = other.max_depth; + num_pages = other.num_pages; } constexpr start_offset_output_iterator operator+(int i) @@ -1280,15 +1298,15 @@ struct start_offset_output_iterator { private: __device__ reference dereference(int index) { - auto page_idx = index % num_pages; - auto depth_idx = (index / num_pages) % max_depth; - auto col_idx = index / (max_depth * num_pages); + auto indices = get_indices{index, max_depth, num_pages}; - gpu::PageInfo const& p = pages[page_indices[page_idx]]; - if (p.src_col_schema != input_cols[col_idx].schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || depth_idx >= input_cols[col_idx].nesting_depth) { + gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; + if (p.src_col_schema != input_cols[indices.col_idx].schema_idx || + p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return empty; } - return p.nesting_decode[depth_idx].page_start_value; + return p.nesting_decode[indices.depth_idx].page_start_value; } }; @@ -1611,53 +1629,52 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { auto& page_index = _chunk_itm_data.page_index; - + hostdevice_vector input_cols{_input_columns.size(), _stream}; - for (size_t i = 0; i < _input_columns.size(); i++) - { - input_cols[i].schema_idx = _input_columns[i].schema_idx; - input_cols[i].nesting_depth = _input_columns[i].nesting_depth(); + size_t max_depth = 0; + for (size_t i = 0; i < _input_columns.size(); i++) { + auto depth = _input_columns[i].nesting_depth(); + max_depth = depth > max_depth ? depth : max_depth; + input_cols[i].nesting_depth = depth; + input_cols[i].schema_idx = _input_columns[i].schema_idx; } - auto max_depth = std::max_element(input_cols.begin(), input_cols.end(), [](auto a, auto b){return a.nesting_depth < b.nesting_depth;})->nesting_depth; input_cols.host_to_device(_stream); // size iterator. indexes pages by sorted order auto size_input = cudf::detail::make_counting_transform_iterator( 0, - get_page_nesting_size{input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + get_page_nesting_size{ + input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); - auto reduction_keys = cudf::detail::make_counting_transform_iterator( - 0, - get_reduction_key{pages.size()}); + auto reduction_keys = + cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; + auto num_keys = _input_columns.size() * max_depth * pages.size(); - // find the size of each column - thrust::reduce_by_key( - rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + (_input_columns.size() * max_depth * pages.size()), - size_input, - thrust::make_discard_iterator(), - sizes.d_begin() - ); + // find the size of each column + thrust::reduce_by_key(rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input, + thrust::make_discard_iterator(), + sizes.d_begin()); // for nested hierarchies, compute per-page start offset - thrust::exclusive_scan_by_key( - rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + (_input_columns.size() * max_depth * pages.size()), - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - input_cols.device_ptr(), - max_depth, - pages.size()}); + thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input, + start_offset_output_iterator{pages.device_ptr(), + page_index.begin(), + 0, + input_cols.device_ptr(), + max_depth, + pages.size()}); sizes.device_to_host(_stream, true); for (size_t idx = 0; idx < _input_columns.size(); idx++) { auto const& input_col = _input_columns[idx]; - auto* cols = &_output_buffers; + auto* cols = &_output_buffers; for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; @@ -1674,7 +1691,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // allocate out_buf.create(size, _stream, _mr); - } + } } } } From ebb2c22eca68eb7ce2744410ac08a468f86b346a Mon Sep 17 00:00:00 2001 From: srikarvanavasam Date: Wed, 22 Mar 2023 08:33:12 -0700 Subject: [PATCH 04/12] Fix comment --- cpp/src/io/parquet/reader_impl_preprocess.cu | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 9a64e72a84d..6bf729fc88b 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1218,8 +1218,8 @@ struct input_col_info { /** * @brief Converts a 1-dimensional index into page, depth and column indices used in - * allocate_columns to compute columns sizes. The input index will through pages, nesting depth and - * columns in that order. + * allocate_columns to compute columns sizes. The input index will iterate through pages, nesting depth and + * column indices in that order. */ struct get_indices { size_t page_idx; From 7448aa5dcd94c3375da185cc2b676dd3973629e5 Mon Sep 17 00:00:00 2001 From: srikarvanavasam Date: Mon, 27 Mar 2023 17:55:54 -0700 Subject: [PATCH 05/12] Reviewer comments --- cpp/src/io/parquet/reader_impl_preprocess.cu | 127 ++++++++++--------- 1 file changed, 66 insertions(+), 61 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 6bf729fc88b..5f09ec33811 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1212,24 +1212,25 @@ struct get_page_schema { }; struct input_col_info { - int schema_idx; - size_t nesting_depth; + int const schema_idx; + size_type const nesting_depth; }; /** * @brief Converts a 1-dimensional index into page, depth and column indices used in - * allocate_columns to compute columns sizes. The input index will iterate through pages, nesting depth and - * column indices in that order. + * allocate_columns to compute columns sizes. + * + * The input index will iterate through pages, nesting depth and column indices in that order. */ -struct get_indices { - size_t page_idx; - size_t depth_idx; - size_t col_idx; - - __device__ get_indices(int index, size_t max_depth, size_t num_pages) - : page_idx(index % num_pages), - depth_idx((index / num_pages) % max_depth), - col_idx(index / (max_depth * num_pages)) +struct reduction_indices { + const size_t _page_idx; + const size_type _depth_idx; + const size_type _col_idx; + + __device__ reduction_indices(size_t index, size_type max_depth, size_t num_pages) + : _page_idx(index % num_pages), + _depth_idx((index / num_pages) % max_depth), + _col_idx(index / (max_depth * num_pages)) { } }; @@ -1238,30 +1239,30 @@ struct get_indices { * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - input_col_info* input_cols; - size_t max_depth; - size_t num_pages; + input_col_info const* input_cols; + const size_type max_depth; + const size_t num_pages; gpu::PageInfo const* const pages; int const* page_indices; - __device__ size_type operator()(int index) const + __device__ size_type operator()(size_t index) const { - auto indices = get_indices{index, max_depth, num_pages}; + auto const indices = reduction_indices{index, max_depth, num_pages}; - auto const& page = pages[page_indices[indices.page_idx]]; - if (page.src_col_schema != input_cols[indices.col_idx].schema_idx || + auto const& page = pages[page_indices[indices._page_idx]]; + if (page.src_col_schema != input_cols[indices._col_idx].schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { + indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { return 0; } - return page.nesting[indices.depth_idx].batch_size; + return page.nesting[indices._depth_idx].batch_size; } }; struct get_reduction_key { - size_t num_pages; - __device__ size_type operator()(int index) { return index / num_pages; } + const size_t num_pages; + __device__ size_t operator()(size_t index) const { return index / num_pages; } }; /** @@ -1292,9 +1293,9 @@ struct chunk_row_output_iter { struct start_offset_output_iterator { gpu::PageInfo* pages; int const* page_indices; - int cur_index; - input_col_info* input_cols; - size_t max_depth; + size_t cur_index; + input_col_info const* input_cols; + size_type max_depth; size_t num_pages; int empty = 0; using value_type = size_type; @@ -1313,7 +1314,7 @@ struct start_offset_output_iterator { num_pages = other.num_pages; } - constexpr start_offset_output_iterator operator+(int i) + constexpr start_offset_output_iterator operator+(size_t i) { return start_offset_output_iterator{ pages, page_indices, cur_index + i, input_cols, max_depth, num_pages}; @@ -1321,21 +1322,21 @@ struct start_offset_output_iterator { constexpr void operator++() { cur_index++; } - __device__ reference operator[](int i) { return dereference(cur_index + i); } + __device__ reference operator[](size_t i) { return dereference(cur_index + i); } __device__ reference operator*() { return dereference(cur_index); } private: - __device__ reference dereference(int index) + __device__ reference dereference(size_t index) { - auto indices = get_indices{index, max_depth, num_pages}; + auto const indices = reduction_indices{index, max_depth, num_pages}; - gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; - if (p.src_col_schema != input_cols[indices.col_idx].schema_idx || + gpu::PageInfo const& p = pages[page_indices[indices._page_idx]]; + if (p.src_col_schema != input_cols[indices._col_idx].schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { + indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { return empty; } - return p.nesting_decode[indices.depth_idx].page_start_value; + return p.nesting_decode[indices._depth_idx].page_start_value; } }; @@ -1659,26 +1660,32 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses if (has_lists) { auto& page_index = _chunk_itm_data.page_index; - hostdevice_vector input_cols{_input_columns.size(), _stream}; - size_t max_depth = 0; - for (size_t i = 0; i < _input_columns.size(); i++) { - auto depth = _input_columns[i].nesting_depth(); - max_depth = depth > max_depth ? depth : max_depth; - input_cols[i].nesting_depth = depth; - input_cols[i].schema_idx = _input_columns[i].schema_idx; - } - input_cols.host_to_device(_stream); + std::vector h_cols_info; + h_cols_info.reserve(_input_columns.size()); + std::transform(_input_columns.cbegin(), + _input_columns.cend(), + std::back_inserter(h_cols_info), + [](auto& col) -> input_col_info { + return {static_cast(col.nesting_depth()), col.schema_idx}; + }); + auto const max_depth = + (*std::max_element(h_cols_info.cbegin(), + h_cols_info.cend(), + [](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; })) + .nesting_depth; + auto const d_cols_info = cudf::detail::make_device_uvector_async( + h_cols_info, _stream, rmm::mr::get_current_device_resource()); // size iterator. indexes pages by sorted order - auto size_input = cudf::detail::make_counting_transform_iterator( + auto const size_input = cudf::detail::make_counting_transform_iterator( 0, get_page_nesting_size{ - input_cols.device_ptr(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); + d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); - auto reduction_keys = + auto const reduction_keys = cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; - auto num_keys = _input_columns.size() * max_depth * pages.size(); + auto const num_keys = _input_columns.size() * max_depth * pages.size(); // find the size of each column thrust::reduce_by_key(rmm::exec_policy(_stream), @@ -1689,22 +1696,20 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses sizes.d_begin()); // for nested hierarchies, compute per-page start offset - thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), - reduction_keys, - reduction_keys + num_keys, - size_input, - start_offset_output_iterator{pages.device_ptr(), - page_index.begin(), - 0, - input_cols.device_ptr(), - max_depth, - pages.size()}); + thrust::exclusive_scan_by_key( + rmm::exec_policy(_stream), + reduction_keys, + reduction_keys + num_keys, + size_input, + start_offset_output_iterator{ + pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()}); sizes.device_to_host(_stream, true); - for (size_t idx = 0; idx < _input_columns.size(); idx++) { + for (size_type idx = 0; idx < static_cast(_input_columns.size()); idx++) { auto const& input_col = _input_columns[idx]; auto* cols = &_output_buffers; - for (size_t l_idx = 0; l_idx < input_col.nesting_depth(); l_idx++) { + for (size_type l_idx = 0; l_idx < static_cast(input_col.nesting_depth()); + l_idx++) { auto& out_buf = (*cols)[input_col.nesting[l_idx]]; cols = &out_buf.children; // if this buffer is part of a list hierarchy, we need to determine it's @@ -1713,7 +1718,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // for struct columns, higher levels of the output columns are shared between input // columns. so don't compute any given level more than once. if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) { - int size = sizes[(idx * max_depth) + l_idx]; + auto size = sizes[(idx * max_depth) + l_idx]; // if this is a list column add 1 for non-leaf levels for the terminating offset if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } From b7f9deb816f7073cce0ff7a73380e64ed7509001 Mon Sep 17 00:00:00 2001 From: srikarvanavasam Date: Mon, 3 Apr 2023 18:51:59 -0700 Subject: [PATCH 06/12] review comments --- cpp/src/io/parquet/reader_impl_preprocess.cu | 63 ++++++++++++-------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 5f09ec33811..cdfd02ffc66 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1212,8 +1212,8 @@ struct get_page_schema { }; struct input_col_info { - int const schema_idx; - size_type const nesting_depth; + int const _schema_idx; + size_type const _nesting_depth; }; /** @@ -1223,14 +1223,14 @@ struct input_col_info { * The input index will iterate through pages, nesting depth and column indices in that order. */ struct reduction_indices { - const size_t _page_idx; - const size_type _depth_idx; - const size_type _col_idx; - - __device__ reduction_indices(size_t index, size_type max_depth, size_t num_pages) - : _page_idx(index % num_pages), - _depth_idx((index / num_pages) % max_depth), - _col_idx(index / (max_depth * num_pages)) + size_t const page_idx; + size_type const depth_idx; + size_type const col_idx; + + __device__ reduction_indices(size_t index_, size_type max_depth_, size_t num_pages_) + : page_idx(index_ % num_pages_), + depth_idx((index_ / num_pages_) % max_depth_), + col_idx(index_ / (max_depth_ * num_pages_)) { } }; @@ -1240,8 +1240,8 @@ struct reduction_indices { */ struct get_page_nesting_size { input_col_info const* input_cols; - const size_type max_depth; - const size_t num_pages; + size_type const max_depth; + size_t const num_pages; gpu::PageInfo const* const pages; int const* page_indices; @@ -1249,14 +1249,14 @@ struct get_page_nesting_size { { auto const indices = reduction_indices{index, max_depth, num_pages}; - auto const& page = pages[page_indices[indices._page_idx]]; - if (page.src_col_schema != input_cols[indices._col_idx].schema_idx || + auto const& page = pages[page_indices[indices.page_idx]]; + if (page.src_col_schema != input_cols[indices.col_idx]._schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { + indices.depth_idx >= input_cols[indices.col_idx]._nesting_depth) { return 0; } - return page.nesting[indices._depth_idx].batch_size; + return page.nesting[indices.depth_idx].batch_size; } }; @@ -1330,13 +1330,13 @@ struct start_offset_output_iterator { { auto const indices = reduction_indices{index, max_depth, num_pages}; - gpu::PageInfo const& p = pages[page_indices[indices._page_idx]]; - if (p.src_col_schema != input_cols[indices._col_idx].schema_idx || + gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; + if (p.src_col_schema != input_cols[indices.col_idx]._schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices._depth_idx >= input_cols[indices._col_idx].nesting_depth) { + indices.depth_idx >= input_cols[indices.col_idx]._nesting_depth) { return empty; } - return p.nesting_decode[indices._depth_idx].page_start_value; + return p.nesting_decode[indices.depth_idx].page_start_value; } }; @@ -1666,15 +1666,28 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses _input_columns.cend(), std::back_inserter(h_cols_info), [](auto& col) -> input_col_info { - return {static_cast(col.nesting_depth()), col.schema_idx}; + return {col.schema_idx, static_cast(col.nesting_depth())}; }); + auto const max_depth = (*std::max_element(h_cols_info.cbegin(), h_cols_info.cend(), - [](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; })) - .nesting_depth; - auto const d_cols_info = cudf::detail::make_device_uvector_async( - h_cols_info, _stream, rmm::mr::get_current_device_resource()); + [](auto& l, auto& r) { return l._nesting_depth < r._nesting_depth; })) + ._nesting_depth; + + auto const d_cols_info = cudf::detail::make_device_uvector_sync( + h_cols_info, _stream); + + + // hostdevice_vector input_cols{_input_columns.size(), _stream}; + // size_type max_depth = 0; + // for (size_t i = 0; i < _input_columns.size(); i++) { + // auto depth = static_cast(_input_columns[i].nesting_depth()); + // max_depth = depth > max_depth ? depth : max_depth; + // input_cols[i]._nesting_depth = depth; + // input_cols[i]._schema_idx = _input_columns[i].schema_idx; + // } + // input_cols.host_to_device(_stream); // size iterator. indexes pages by sorted order auto const size_input = cudf::detail::make_counting_transform_iterator( From 8376e4e9d6663e5cbb37211c14b10b8d81ca0931 Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Mon, 3 Apr 2023 20:29:28 -0700 Subject: [PATCH 07/12] remove comment --- cpp/src/io/parquet/reader_impl_preprocess.cu | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 2b70bf39186..3cbc187e2d6 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1679,19 +1679,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses [](auto& l, auto& r) { return l._nesting_depth < r._nesting_depth; })) ._nesting_depth; - auto const d_cols_info = cudf::detail::make_device_uvector_sync( - h_cols_info, _stream); - - - // hostdevice_vector input_cols{_input_columns.size(), _stream}; - // size_type max_depth = 0; - // for (size_t i = 0; i < _input_columns.size(); i++) { - // auto depth = static_cast(_input_columns[i].nesting_depth()); - // max_depth = depth > max_depth ? depth : max_depth; - // input_cols[i]._nesting_depth = depth; - // input_cols[i]._schema_idx = _input_columns[i].schema_idx; - // } - // input_cols.host_to_device(_stream); + auto const d_cols_info = cudf::detail::make_device_uvector_async( + h_cols_info, _stream, rmm::mr::get_current_device_resource()); // size iterator. indexes pages by sorted order auto const size_input = cudf::detail::make_counting_transform_iterator( From dd4ea710682cb1ca2ceecc6b5a5601afcd80bac8 Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Mon, 3 Apr 2023 20:34:51 -0700 Subject: [PATCH 08/12] Format --- cpp/src/io/parquet/reader_impl_preprocess.cu | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 3cbc187e2d6..577641b1b77 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1216,8 +1216,8 @@ struct get_page_schema { }; struct input_col_info { - int const _schema_idx; - size_type const _nesting_depth; + int const _schema_idx; + size_type const _nesting_depth; }; /** @@ -1668,6 +1668,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses h_cols_info.reserve(_input_columns.size()); std::transform(_input_columns.cbegin(), _input_columns.cend(), + std::back_inserter(h_cols_info), [](auto& col) -> input_col_info { return {col.schema_idx, static_cast(col.nesting_depth())}; From 5a7a51ae84cd6f4239f85a975149a952f2d6d41c Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Mon, 3 Apr 2023 20:35:21 -0700 Subject: [PATCH 09/12] format --- cpp/src/io/parquet/reader_impl_preprocess.cu | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 577641b1b77..2314031202e 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1668,7 +1668,6 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses h_cols_info.reserve(_input_columns.size()); std::transform(_input_columns.cbegin(), _input_columns.cend(), - std::back_inserter(h_cols_info), [](auto& col) -> input_col_info { return {col.schema_idx, static_cast(col.nesting_depth())}; From 42c79458a347665730d32a71eae6042546f5c63b Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Tue, 4 Apr 2023 07:32:22 -0700 Subject: [PATCH 10/12] east const --- cpp/src/io/parquet/reader_impl_preprocess.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 2314031202e..a7dabe6b2a2 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1265,7 +1265,7 @@ struct get_page_nesting_size { }; struct get_reduction_key { - const size_t num_pages; + size_t const num_pages; __device__ size_t operator()(size_t index) const { return index / num_pages; } }; From 84b2a0a335c6b2b2f480f79e0aa30135ad91cfbe Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Tue, 4 Apr 2023 20:17:59 -0700 Subject: [PATCH 11/12] reviews --- cpp/src/io/parquet/reader_impl_preprocess.cu | 21 ++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index a7dabe6b2a2..369bb78a152 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1216,8 +1216,8 @@ struct get_page_schema { }; struct input_col_info { - int const _schema_idx; - size_type const _nesting_depth; + int const schema_idx; + size_type const nesting_depth; }; /** @@ -1243,7 +1243,7 @@ struct reduction_indices { * @brief Returns the size field of a PageInfo struct for a given depth, keyed by schema. */ struct get_page_nesting_size { - input_col_info const* input_cols; + input_col_info const* const input_cols; size_type const max_depth; size_t const num_pages; gpu::PageInfo const* const pages; @@ -1254,9 +1254,9 @@ struct get_page_nesting_size { auto const indices = reduction_indices{index, max_depth, num_pages}; auto const& page = pages[page_indices[indices.page_idx]]; - if (page.src_col_schema != input_cols[indices.col_idx]._schema_idx || + if (page.src_col_schema != input_cols[indices.col_idx].schema_idx || page.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx]._nesting_depth) { + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return 0; } @@ -1295,7 +1295,7 @@ struct chunk_row_output_iter { * @brief Writes to the page_start_value field of the PageNestingInfo struct, keyed by schema. */ struct start_offset_output_iterator { - gpu::PageInfo* pages; + gpu::PageInfo const* pages; int const* page_indices; size_t cur_index; input_col_info const* input_cols; @@ -1335,9 +1335,9 @@ struct start_offset_output_iterator { auto const indices = reduction_indices{index, max_depth, num_pages}; gpu::PageInfo const& p = pages[page_indices[indices.page_idx]]; - if (p.src_col_schema != input_cols[indices.col_idx]._schema_idx || + if (p.src_col_schema != input_cols[indices.col_idx].schema_idx || p.flags & gpu::PAGEINFO_FLAGS_DICTIONARY || - indices.depth_idx >= input_cols[indices.col_idx]._nesting_depth) { + indices.depth_idx >= input_cols[indices.col_idx].nesting_depth) { return empty; } return p.nesting_decode[indices.depth_idx].page_start_value; @@ -1676,8 +1676,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses auto const max_depth = (*std::max_element(h_cols_info.cbegin(), h_cols_info.cend(), - [](auto& l, auto& r) { return l._nesting_depth < r._nesting_depth; })) - ._nesting_depth; + [](auto& l, auto& r) { return l.nesting_depth < r.nesting_depth; })) + .nesting_depth; auto const d_cols_info = cudf::detail::make_device_uvector_async( h_cols_info, _stream, rmm::mr::get_current_device_resource()); @@ -1690,6 +1690,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses auto const reduction_keys = cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); + hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; auto const num_keys = _input_columns.size() * max_depth * pages.size(); From 2beac8cfa4ed2154a6129483e643117642795810 Mon Sep 17 00:00:00 2001 From: Srikar Vanavasam Date: Thu, 6 Apr 2023 13:35:57 -0700 Subject: [PATCH 12/12] size_input with transform --- cpp/src/io/parquet/reader_impl_preprocess.cu | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 369bb78a152..72641adb402 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1682,23 +1682,25 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses auto const d_cols_info = cudf::detail::make_device_uvector_async( h_cols_info, _stream, rmm::mr::get_current_device_resource()); + auto const num_keys = _input_columns.size() * max_depth * pages.size(); // size iterator. indexes pages by sorted order - auto const size_input = cudf::detail::make_counting_transform_iterator( - 0, + rmm::device_uvector size_input{num_keys, _stream}; + thrust::transform( + rmm::exec_policy(_stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_keys), + size_input.begin(), get_page_nesting_size{ d_cols_info.data(), max_depth, pages.size(), pages.device_ptr(), page_index.begin()}); - auto const reduction_keys = cudf::detail::make_counting_transform_iterator(0, get_reduction_key{pages.size()}); - hostdevice_vector sizes{_input_columns.size() * max_depth, _stream}; - auto const num_keys = _input_columns.size() * max_depth * pages.size(); // find the size of each column thrust::reduce_by_key(rmm::exec_policy(_stream), reduction_keys, reduction_keys + num_keys, - size_input, + size_input.cbegin(), thrust::make_discard_iterator(), sizes.d_begin()); @@ -1707,7 +1709,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses rmm::exec_policy(_stream), reduction_keys, reduction_keys + num_keys, - size_input, + size_input.cbegin(), start_offset_output_iterator{ pages.device_ptr(), page_index.begin(), 0, d_cols_info.data(), max_depth, pages.size()});