Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed an issue with output chunking computation stemming from input chunking. #14889

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,15 @@ struct set_row_index {
device_span<ColumnChunkDesc const> chunks;
device_span<PageInfo const> pages;
device_span<cumulative_page_info> c_info;
size_t const max_row;
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved

__device__ void operator()(size_t i)
{
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_start_row = chunk.start_row + page.chunk_row + page.num_rows;
c_info[i].row_index = page_start_row;
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed a cap, apply it
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
c_info[i].row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
}
};

Expand Down Expand Up @@ -1288,7 +1290,7 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_row_index{pass.chunks, pass.pages, c_info});
set_row_index{pass.chunks, pass.pages, c_info, 0});
// print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream);

// get the next batch of pages
Expand Down Expand Up @@ -1533,10 +1535,15 @@ void reader::impl::compute_output_chunks_for_subpass()
thrust::equal_to{},
cumulative_page_sum{});
auto iter = thrust::make_counting_iterator(0);
// cap the max row in all pages by the max row we expect in the subpass. input chunking
// can cause "dangling" row counts where for example, only 1 column has a page whose
// maximum row is beyond our expected subpass max row, which will cause an out of
// bounds index in compute_page_splits_by_row.
auto const subpass_max_row = subpass.skip_rows + subpass.num_rows;
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + subpass.pages.size(),
set_row_index{pass.chunks, subpass.pages, c_info});
set_row_index{pass.chunks, subpass.pages, c_info, subpass_max_row});
// print_cumulative_page_info(subpass.pages, c_info, _stream);

// compute the splits
Expand Down
Loading