From 739238dbc78ceee9f44e665329c32c7dcaf0979b Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Tue, 19 Mar 2024 15:25:47 -0700 Subject: [PATCH 1/4] Fixed two list chunked reading related issues: First, row groups of lists being loaded together were not getting their end row counts computed correctly on a per-rowgroup basis. The main consequence there was that we could potentially have generated splits that were larger than we would have liked. But downstream from this was a second bug where we were generating incorrect page indices to be decoded, causing corruption in the decode kernels. --- cpp/src/io/parquet/reader_impl_chunking.cu | 64 +++++++++------ cpp/src/io/parquet/reader_impl_preprocess.cu | 30 ++++--- cpp/tests/io/parquet_chunked_reader_test.cu | 86 +++++++++++++++++++- 3 files changed, 135 insertions(+), 45 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 9c14902ef2f..f043770e839 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -48,9 +48,9 @@ struct split_info { }; struct cumulative_page_info { - size_t row_index; // row index - size_t size_bytes; // cumulative size in bytes - int key; // schema index + size_t end_row_index; // end row index (exclusive) + size_t size_bytes; // cumulative size in bytes + int key; // schema index }; // the minimum amount of memory we can safely expect to be enough to @@ -260,7 +260,7 @@ struct set_row_index { auto const& chunk = chunks[page.chunk_idx]; size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows; // if we have been passed in a cap, apply it - c_info[i].row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row; + c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row; } }; @@ -293,13 +293,13 @@ struct page_total_size { auto const end = key_offsets[idx + 1]; auto iter = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type([&] __device__(size_type i) { - return c_info[i].row_index; + return c_info[i].end_row_index; })); auto const page_index = - thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_index) - iter; + thrust::lower_bound(thrust::seq, iter + start, iter + end, i.end_row_index) - iter; sum += c_info[page_index].size_bytes; } - return {i.row_index, sum, i.key}; + return {i.end_row_index, sum, i.key}; } }; @@ -318,14 +318,15 @@ size_t find_start_index(cudf::host_span aggregated_i size_t start_row) { auto start = thrust::make_transform_iterator( - aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.row_index; }); + aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.end_row_index; }); auto start_index = thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) - start; - // cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until - // we find the next group of pages - while (start_index < (static_cast(aggregated_info.size()) - 1) && - (start_index < 0 || aggregated_info[start_index].row_index == start_row)) { + // list rows can span page boundaries, so it is not always safe to assume that the row + // represented by end_row_index starts on the subsequent page. It is possible that + // the values for row end_row_index start within the page itself. so we must + // include the page in that case. + while (start_index < (static_cast(aggregated_info.size()) - 1) && (start_index < 0)) { start_index++; } @@ -353,16 +354,18 @@ int64_t find_next_split(int64_t cur_pos, int64_t split_pos = thrust::lower_bound(thrust::seq, start + cur_pos, end, size_limit) - start; // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back - // one. + // one. note that in the case where we can't even fit the current set of rows within the size + // limit, this will cause split_pos to go below cur_pos. but that is handled in the loop below. if (static_cast(split_pos) >= sizes.size() || (sizes[split_pos].size_bytes - cur_cumulative_size > size_limit)) { split_pos--; } - // cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until - // we find the next group of pages + // move forward until we find the next group of pages that will actually advance our row count. + // this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos + // is, we will still move forward instead of failing. while (split_pos < (static_cast(sizes.size()) - 1) && - (split_pos < 0 || sizes[split_pos].row_index == cur_row_index)) { + (split_pos < cur_pos || sizes[split_pos].end_row_index == cur_row_index)) { split_pos++; } @@ -413,7 +416,7 @@ template struct row_count_less { __device__ bool operator()(cumulative_page_info const& a, cumulative_page_info const& b) const { - return a.row_index < b.row_index; + return a.end_row_index < b.end_row_index; } }; @@ -501,10 +504,10 @@ struct page_span { size_t start, end; }; -struct get_page_row_index { +struct get_page_end_row_index { device_span c_info; - __device__ size_t operator()(size_t i) const { return c_info[i].row_index; } + __device__ size_t operator()(size_t i) const { return c_info[i].end_row_index; } }; /** @@ -514,15 +517,18 @@ struct get_page_row_index { template struct get_page_span { device_span page_offsets; + device_span chunks; RowIndexIter page_row_index; size_t const start_row; size_t const end_row; get_page_span(device_span _page_offsets, + device_span _chunks, RowIndexIter _page_row_index, size_t _start_row, size_t _end_row) : page_offsets(_page_offsets), + chunks(_chunks), page_row_index(_page_row_index), start_row(_start_row), end_row(_end_row) @@ -535,12 +541,17 @@ struct get_page_span { auto const column_page_start = page_row_index + first_page_index; auto const column_page_end = page_row_index + page_offsets[column_index + 1]; auto const num_pages = column_page_end - column_page_start; + bool const is_list = chunks[column_index].max_level[level_type::REPETITION] > 0; auto start_page = (thrust::lower_bound(thrust::seq, column_page_start, column_page_end, start_row) - column_page_start) + first_page_index; - if (page_row_index[start_page] == start_row) { start_page++; } + // list rows can span page boundaries, so it is not always safe to assume that the row + // represented by end_row_index starts on the subsequent page. It is possible that + // the values for row end_row_index start within the page itself. so we must + // include the page in that case. + if (page_row_index[start_page] == start_row && !is_list) { start_page++; } auto end_page = (thrust::lower_bound(thrust::seq, column_page_start, column_page_end, end_row) - column_page_start) + @@ -623,6 +634,7 @@ struct copy_subpass_page { * * @param c_info The cumulative page size information (row count and byte size) per column * @param pages All of the pages in the pass + * @param chunks All of the chunks in the pass * @param page_offsets Offsets into the pages array representing the first page for each column * @param start_row The row to start the subpass at * @param size_limit The size limit in bytes of the subpass @@ -636,6 +648,7 @@ struct copy_subpass_page { std::tuple, size_t, size_t> compute_next_subpass( device_span c_info, device_span pages, + device_span chunks, device_span page_offsets, size_t start_row, size_t size_limit, @@ -658,18 +671,18 @@ std::tuple, size_t, size_t> compute_next_subpass( start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes; auto const end_index = find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit); - auto const end_row = h_aggregated_info[end_index].row_index; + auto const end_row = h_aggregated_info[end_index].end_row_index; // for each column, collect the set of pages that spans start_row / end_row rmm::device_uvector page_bounds(num_columns, stream); auto iter = thrust::make_counting_iterator(size_t{0}); auto page_row_index = - cudf::detail::make_counting_transform_iterator(0, get_page_row_index{c_info}); + cudf::detail::make_counting_transform_iterator(0, get_page_end_row_index{c_info}); thrust::transform(rmm::exec_policy_nosync(stream), iter, iter + num_columns, page_bounds.begin(), - get_page_span{page_offsets, page_row_index, start_row, end_row}); + get_page_span{page_offsets, chunks, page_row_index, start_row, end_row}); // total page count over all columns auto page_count_iter = thrust::make_transform_iterator(page_bounds.begin(), get_span_size{}); @@ -700,13 +713,13 @@ std::vector compute_page_splits_by_row(device_span pages; device_span chunks; - device_span page_offsets; - size_t const max_row; __device__ void operator()(size_t i) { - auto const last_page_index = page_offsets[i + 1] - 1; - auto const& page = pages[last_page_index]; - auto const& chunk = chunks[page.chunk_idx]; - size_t const page_start_row = chunk.start_row + page.chunk_row; - pages[last_page_index].num_rows = max_row - page_start_row; + auto& page = pages[i]; + auto const& chunk = chunks[page.chunk_idx]; + // only do this for the last page in each chunk + if (i < pages.size() && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } + size_t const page_start_row = chunk.start_row + page.chunk_row; + size_t const chunk_last_row = chunk.start_row + chunk.num_rows; + page.num_rows = chunk_last_row - page_start_row; } }; @@ -1300,17 +1300,15 @@ void reader::impl::generate_list_column_row_count_estimates() chunk_row_output_iter{pass.pages.device_ptr()}); } - // finally, fudge the last page for each column such that it ends on the real known row count - // for the pass. this is so that as we march through the subpasses, we will find that every column - // cleanly ends up the expected row count at the row group boundary. - auto const& last_chunk = pass.chunks[pass.chunks.size() - 1]; - auto const num_columns = _input_columns.size(); - size_t const max_row = last_chunk.start_row + last_chunk.num_rows; - auto iter = thrust::make_counting_iterator(0); + // to compensate for the list row size estimates, force the row count on the last page for each + // column chunk (each rowgroup) such that it ends on the real known row count. this is so that as + // we march through the subpasses, we will find that every column cleanly ends up the expected row + // count at the row group boundary and our split computations work correctly. + auto iter = thrust::make_counting_iterator(0); thrust::for_each(rmm::exec_policy_nosync(_stream), iter, - iter + num_columns, - set_final_row_count{pass.pages, pass.chunks, pass.page_offsets, max_row}); + iter + pass.pages.size(), + set_final_row_count{pass.pages, pass.chunks}); pass.chunks.device_to_host_async(_stream); pass.pages.device_to_host_async(_stream); diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 2c992677a65..019dd7c710a 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -109,12 +109,12 @@ auto write_file(std::vector>& input_columns, return std::pair{std::move(input_table), std::move(filepath)}; } -auto chunked_read(std::string const& filepath, +auto chunked_read(std::vector const& filepaths, std::size_t output_limit, std::size_t input_limit = 0) { auto const read_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepaths}).build(); auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts); auto num_chunks = 0; @@ -139,6 +139,14 @@ auto chunked_read(std::string const& filepath, return std::pair(cudf::concatenate(out_tviews), num_chunks); } +auto chunked_read(std::string const& filepath, + std::size_t output_limit, + std::size_t input_limit = 0) +{ + std::vector vpath{filepath}; + return chunked_read(vpath, output_limit, input_limit); +} + } // namespace struct ParquetChunkedReaderTest : public cudf::test::BaseFixture {}; @@ -1111,7 +1119,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, SingleFixedWidthColumn) input_limit_test_write(test_filenames, tbl); // semi-reasonable limit - constexpr int expected_a[] = {1, 17, 4, 1}; + constexpr int expected_a[] = {1, 25, 5, 1}; input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a); // an unreasonable limit constexpr int expected_b[] = {1, 50, 50, 1}; @@ -1143,7 +1151,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, MixedColumns) input_limit_test_write(test_filenames, tbl); - constexpr int expected_a[] = {1, 50, 10, 7}; + constexpr int expected_a[] = {1, 50, 13, 7}; input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a); constexpr int expected_b[] = {1, 50, 50, 50}; input_limit_test_read(test_filenames, tbl, 0, 1, expected_b); @@ -1225,6 +1233,76 @@ TEST_F(ParquetChunkedReaderInputLimitTest, List) input_limit_test_read(test_filenames, tbl, 128 * 1024 * 1024, 512 * 1024 * 1024, expected_c); } +void tiny_list_rowgroup_test(bool just_list_col) +{ + auto iter = thrust::make_counting_iterator(0); + + // test a specific edge case: a list column composed of multiple row groups, where each row + // group contains a single, relatively small row. + std::vector row_sizes{12, 7, 16, 20, 10, 3, 15}; + std::vector> row_groups; + for (size_t idx = 0; idx < row_sizes.size(); idx++) { + std::vector> cols; + + // add a column before the list + if (!just_list_col) { + cudf::test::fixed_width_column_wrapper int_col({idx}); + cols.push_back(int_col.release()); + } + + // write out the single-row list column as it's own file + cudf::test::fixed_width_column_wrapper values(iter, iter + row_sizes[idx]); + cudf::test::fixed_width_column_wrapper offsets({0, row_sizes[idx]}); + cols.push_back(cudf::make_lists_column(1, offsets.release(), values.release(), 0, {})); + + // add a column after the list + if (!just_list_col) { + cudf::test::fixed_width_column_wrapper float_col({idx}); + cols.push_back(float_col.release()); + } + + auto tbl = std::make_unique(std::move(cols)); + + auto filepath = temp_env->get_temp_filepath("Tlrg" + std::to_string(idx)); + auto const write_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *tbl).build(); + cudf::io::write_parquet(write_opts); + + // store off the table + row_groups.push_back(std::move(tbl)); + } + + // build expected + std::vector views; + std::transform(row_groups.begin(), + row_groups.end(), + std::back_inserter(views), + [](std::unique_ptr const& tbl) { return tbl->view(); }); + auto expected = cudf::concatenate(views); + + // load the individual files all at once + std::vector source_files; + std::transform(iter, iter + row_groups.size(), std::back_inserter(source_files), [](int i) { + return temp_env->get_temp_filepath("Tlrg" + std::to_string(i)); + }); + auto result = + chunked_read(source_files, size_t{2} * 1024 * 1024 * 1024, size_t{2} * 1024 * 1024 * 1024); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *(result.first)); +} + +TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsSingle) +{ + // test with just a single list column + tiny_list_rowgroup_test(true); +} + +TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsMixed) +{ + // test with other columns mixed in + tiny_list_rowgroup_test(false); +} + struct char_values { __device__ int8_t operator()(int i) { From 23934c1e83231966a81f81e18a57dd94218026c2 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 20 Mar 2024 07:57:43 -0700 Subject: [PATCH 2/4] Fix indexing issue. --- cpp/src/io/parquet/reader_impl_preprocess.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index e5dab24da67..e39445108a6 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -706,7 +706,7 @@ struct set_final_row_count { auto& page = pages[i]; auto const& chunk = chunks[page.chunk_idx]; // only do this for the last page in each chunk - if (i < pages.size() && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } + if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; } size_t const page_start_row = chunk.start_row + page.chunk_row; size_t const chunk_last_row = chunk.start_row + chunk.num_rows; page.num_rows = chunk_last_row - page_start_row; From ec63720ccae7d7280261040d2266522ee8119233 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 20 Mar 2024 12:18:41 -0700 Subject: [PATCH 3/4] Remove loop that is now redundant in find_start_index. --- cpp/src/io/parquet/reader_impl_chunking.cu | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index f043770e839..c9b41cf39c4 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -319,18 +319,8 @@ size_t find_start_index(cudf::host_span aggregated_i { auto start = thrust::make_transform_iterator( aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.end_row_index; }); - auto start_index = - thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) - start; - - // list rows can span page boundaries, so it is not always safe to assume that the row - // represented by end_row_index starts on the subsequent page. It is possible that - // the values for row end_row_index start within the page itself. so we must - // include the page in that case. - while (start_index < (static_cast(aggregated_info.size()) - 1) && (start_index < 0)) { - start_index++; - } - - return start_index; + return thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) - + start; } /** From 9a8a9ba99076bce29e234783524fe9b7c8a08098 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 20 Mar 2024 14:20:04 -0700 Subject: [PATCH 4/4] Comment clarification. Simplified some logic in find_next_split to avoid unnecessarily looping in some cases. --- cpp/src/io/parquet/reader_impl_chunking.cu | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index c9b41cf39c4..5c387147e4b 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -48,7 +48,7 @@ struct split_info { }; struct cumulative_page_info { - size_t end_row_index; // end row index (exclusive) + size_t end_row_index; // end row index (start_row + num_rows for the corresponding page) size_t size_bytes; // cumulative size in bytes int key; // schema index }; @@ -344,10 +344,9 @@ int64_t find_next_split(int64_t cur_pos, int64_t split_pos = thrust::lower_bound(thrust::seq, start + cur_pos, end, size_limit) - start; // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back - // one. note that in the case where we can't even fit the current set of rows within the size - // limit, this will cause split_pos to go below cur_pos. but that is handled in the loop below. + // one as long as this doesn't put us before our starting point. if (static_cast(split_pos) >= sizes.size() || - (sizes[split_pos].size_bytes - cur_cumulative_size > size_limit)) { + ((split_pos > cur_pos) && (sizes[split_pos].size_bytes - cur_cumulative_size > size_limit))) { split_pos--; } @@ -355,7 +354,7 @@ int64_t find_next_split(int64_t cur_pos, // this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos // is, we will still move forward instead of failing. while (split_pos < (static_cast(sizes.size()) - 1) && - (split_pos < cur_pos || sizes[split_pos].end_row_index == cur_row_index)) { + (sizes[split_pos].end_row_index == cur_row_index)) { split_pos++; }