diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 912f53a8277..f4fb6bc57e6 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -101,7 +101,7 @@ void print_cumulative_page_info(device_span d_pages, printf("\tP %s: {%lu, %lu, %lu}\n", is_list ? "(L)" : "", pidx, - c_info[pidx].row_index, + c_info[pidx].end_row_index, c_info[pidx].size_bytes); } } @@ -121,16 +121,17 @@ void print_cumulative_row_info(host_span sizes, printf("------------\nCumulative sizes %s (index, row_index, size_bytes, page_key)\n", label.c_str()); for (size_t idx = 0; idx < sizes.size(); idx++) { - printf("{%lu, %lu, %lu, %d}", idx, sizes[idx].row_index, sizes[idx].size_bytes, sizes[idx].key); + printf( + "{%lu, %lu, %lu, %d}", idx, sizes[idx].end_row_index, sizes[idx].size_bytes, sizes[idx].key); if (splits.has_value()) { // if we have a split at this row count and this is the last instance of this row count auto start = thrust::make_transform_iterator(splits->begin(), [](row_range const& i) { return i.skip_rows; }); auto end = start + splits->size(); - auto split = std::find(start, end, sizes[idx].row_index); + auto split = std::find(start, end, sizes[idx].end_row_index); auto const split_index = [&]() -> int { - if (split != end && - ((idx == sizes.size() - 1) || (sizes[idx + 1].row_index > sizes[idx].row_index))) { + if (split != end && ((idx == sizes.size() - 1) || + (sizes[idx + 1].end_row_index > sizes[idx].end_row_index))) { return static_cast(std::distance(start, split)); } return idx == 0 ? 0 : -1; @@ -259,8 +260,9 @@ struct set_row_index { auto const& page = pages[i]; auto const& chunk = chunks[page.chunk_idx]; size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows; - // if we have been passed in a cap, apply it - c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row; + // this cap is necessary because in the chunked reader, we use estimations for the row + // counts for list columns, which can result in values > than the absolute number of rows. + c_info[i].end_row_index = min(max_row, page_end_row); } }; @@ -461,6 +463,7 @@ adjust_cumulative_sizes(device_span c_info, thrust::make_discard_iterator(), key_offsets.begin()) .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); thrust::exclusive_scan( rmm::exec_policy_nosync(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); @@ -1292,10 +1295,12 @@ void reader::impl::setup_next_pass(bool uses_custom_row_bounds) printf("\tnum_rows: %'lu\n", pass.num_rows); printf("\tbase mem usage: %'lu\n", pass.base_mem_size); auto const num_columns = _input_columns.size(); + std::vector h_page_offsets = + cudf::detail::make_std_vector_sync(pass.page_offsets, _stream); for (size_t c_idx = 0; c_idx < num_columns; c_idx++) { printf("\t\tColumn %'lu: num_pages(%'d)\n", c_idx, - pass.page_offsets[c_idx + 1] - pass.page_offsets[c_idx]); + h_page_offsets[c_idx + 1] - h_page_offsets[c_idx]); } #endif @@ -1362,11 +1367,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) // can be considerable. include_decompression_scratch_size(pass.chunks, pass.pages, c_info, _stream); - auto iter = thrust::make_counting_iterator(0); + auto iter = thrust::make_counting_iterator(0); + auto const pass_max_row = pass.skip_rows + pass.num_rows; thrust::for_each(rmm::exec_policy_nosync(_stream), iter, iter + pass.pages.size(), - set_row_index{pass.chunks, pass.pages, c_info, 0}); + set_row_index{pass.chunks, pass.pages, c_info, pass_max_row}); // print_cumulative_page_info(pass.pages, pass.chunks, c_info, _stream); // get the next batch of pages @@ -1448,11 +1454,12 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds) printf("\t\tTotal expected usage: %'lu\n", total_expected_size == 0 ? subpass.decomp_page_data.size() + pass.base_mem_size : total_expected_size + pass.base_mem_size); + std::vector h_page_indices = cudf::detail::make_std_vector_sync(page_indices, _stream); for (size_t c_idx = 0; c_idx < num_columns; c_idx++) { printf("\t\tColumn %'lu: pages(%'lu - %'lu)\n", c_idx, - page_indices[c_idx].start, - page_indices[c_idx].end); + h_page_indices[c_idx].start, + h_page_indices[c_idx].end); } printf("\t\tOutput chunks:\n"); for (size_t idx = 0; idx < subpass.output_chunk_read_info.size(); idx++) {