Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an issue with output chunking computation stemming from input chunking. #14889

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ struct set_row_index {
device_span<ColumnChunkDesc const> chunks;
device_span<PageInfo const> pages;
device_span<cumulative_page_info> c_info;
size_t max_row;

__device__ void operator()(size_t i)
{
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_start_row = chunk.start_row + page.chunk_row + page.num_rows;
c_info[i].row_index = page_start_row;
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
}
};

Expand Down Expand Up @@ -1288,7 +1290,7 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_row_index{pass.chunks, pass.pages, c_info});
set_row_index{pass.chunks, pass.pages, c_info, 0});
// print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream);

// get the next batch of pages
Expand Down Expand Up @@ -1533,10 +1535,15 @@ void reader::impl::compute_output_chunks_for_subpass()
thrust::equal_to{},
cumulative_page_sum{});
auto iter = thrust::make_counting_iterator(0);
// cap the max row in all pages by the max row we expect in the subpass. input chunking
// can cause "dangling" row counts where for example, only 1 column has a page whose
// maximum row is beyond our expected subpass max row, which will cause an out of
// bounds index in compute_page_splits_by_row.
auto const subpass_max_row = subpass.skip_rows + subpass.num_rows;
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + subpass.pages.size(),
set_row_index{pass.chunks, subpass.pages, c_info});
set_row_index{pass.chunks, subpass.pages, c_info, subpass_max_row});
// print_cumulative_page_info(subpass.pages, c_info, _stream);

// compute the splits
Expand Down
Loading