Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cap the absolute row index per pass in parquet chunked reader. #15735

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void print_cumulative_page_info(device_span<PageInfo const> d_pages,
printf("\tP %s: {%lu, %lu, %lu}\n",
is_list ? "(L)" : "",
pidx,
c_info[pidx].row_index,
c_info[pidx].end_row_index,
c_info[pidx].size_bytes);
}
}
Expand All @@ -121,16 +121,17 @@ void print_cumulative_row_info(host_span<cumulative_page_info const> sizes,
printf("------------\nCumulative sizes %s (index, row_index, size_bytes, page_key)\n",
label.c_str());
for (size_t idx = 0; idx < sizes.size(); idx++) {
printf("{%lu, %lu, %lu, %d}", idx, sizes[idx].row_index, sizes[idx].size_bytes, sizes[idx].key);
printf(
"{%lu, %lu, %lu, %d}", idx, sizes[idx].end_row_index, sizes[idx].size_bytes, sizes[idx].key);
if (splits.has_value()) {
// if we have a split at this row count and this is the last instance of this row count
auto start = thrust::make_transform_iterator(splits->begin(),
[](row_range const& i) { return i.skip_rows; });
auto end = start + splits->size();
auto split = std::find(start, end, sizes[idx].row_index);
auto split = std::find(start, end, sizes[idx].end_row_index);
auto const split_index = [&]() -> int {
if (split != end &&
((idx == sizes.size() - 1) || (sizes[idx + 1].row_index > sizes[idx].row_index))) {
if (split != end && ((idx == sizes.size() - 1) ||
(sizes[idx + 1].end_row_index > sizes[idx].end_row_index))) {
return static_cast<int>(std::distance(start, split));
}
return idx == 0 ? 0 : -1;
Expand Down Expand Up @@ -259,8 +260,9 @@ struct set_row_index {
auto const& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
// this cap is necessary because in the chunked reader, we use estimations for the row
// counts for list columns, which can result in values > than the absolute number of rows.
c_info[i].end_row_index = min(max_row, page_end_row);
}
};

Expand Down Expand Up @@ -461,6 +463,7 @@ adjust_cumulative_sizes(device_span<cumulative_page_info const> c_info,
thrust::make_discard_iterator(),
key_offsets.begin())
.second;

size_t const num_unique_keys = key_offsets_end - key_offsets.begin();
thrust::exclusive_scan(
rmm::exec_policy_nosync(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin());
Expand Down Expand Up @@ -1292,10 +1295,12 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds)
printf("\tnum_rows: %'lu\n", pass.num_rows);
printf("\tbase mem usage: %'lu\n", pass.base_mem_size);
auto const num_columns = _input_columns.size();
std::vector<size_type> h_page_offsets =
cudf::detail::make_std_vector_sync(pass.page_offsets, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: num_pages(%'d)\n",
c_idx,
pass.page_offsets[c_idx + 1] - pass.page_offsets[c_idx]);
h_page_offsets[c_idx + 1] - h_page_offsets[c_idx]);
}
#endif

Expand Down Expand Up @@ -1362,11 +1367,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
// can be considerable.
include_decompression_scratch_size(pass.chunks, pass.pages, c_info, _stream);

auto iter = thrust::make_counting_iterator(0);
auto iter = thrust::make_counting_iterator(0);
auto const pass_max_row = pass.skip_rows + pass.num_rows;
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + pass.pages.size(),
set_row_index{pass.chunks, pass.pages, c_info, 0});
set_row_index{pass.chunks, pass.pages, c_info, pass_max_row});
// print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream);

// get the next batch of pages
Expand Down Expand Up @@ -1448,11 +1454,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
printf("\t\tTotal expected usage: %'lu\n",
total_expected_size == 0 ? subpass.decomp_page_data.size() + pass.base_mem_size
: total_expected_size + pass.base_mem_size);
std::vector<page_span> h_page_indices = cudf::detail::make_std_vector_sync(page_indices, _stream);
for (size_t c_idx = 0; c_idx < num_columns; c_idx++) {
printf("\t\tColumn %'lu: pages(%'lu - %'lu)\n",
c_idx,
page_indices[c_idx].start,
page_indices[c_idx].end);
h_page_indices[c_idx].start,
h_page_indices[c_idx].end);
}
printf("\t\tOutput chunks:\n");
for (size_t idx = 0; idx < subpass.output_chunk_read_info.size(); idx++) {
Expand Down
Loading