From c249f052394d79effad7feb00a8ccd5038518971 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 4 Jul 2024 01:14:26 +0000 Subject: [PATCH 01/22] Calculate num rows read from each data source by PQ reader --- cpp/include/cudf/io/types.hpp | 2 + cpp/src/io/parquet/reader_impl.cpp | 87 +++++++++++++++++++- cpp/src/io/parquet/reader_impl.hpp | 21 ++++- cpp/src/io/parquet/reader_impl_chunking.hpp | 6 ++ cpp/src/io/parquet/reader_impl_helpers.cpp | 25 ++++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 20 ++--- cpp/src/io/parquet/reader_impl_preprocess.cu | 19 ++++- 7 files changed, 156 insertions(+), 24 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 0c96268f6c7..8cc48105934 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -277,6 +277,8 @@ struct column_name_info { struct table_metadata { std::vector schema_info; //!< Detailed name information for the entire output hierarchy + std::vector num_rows_per_source; //!< Number of rows read from each data source. Empty + //!< vector if AST filters are being used. std::map user_data; //!< Format-dependent metadata of the first input //!< file as key-values pairs (deprecated) std::vector> diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index f705f6626e7..19e003f7867 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -26,6 +26,7 @@ #include +#include #include #include @@ -549,7 +550,24 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) out_columns.reserve(_output_buffers.size()); // no work to do (this can happen on the first pass if we have no rows to read) - if (!has_more_work()) { return finalize_output(out_metadata, out_columns); } + if (!has_more_work()) { + // Compute number of rows per source if no AST filters + if (not _expr_conv.get_converted_expr().has_value()) { + // If already empty (empty dataframe case), simply initialize to a list of zeros + if (not out_metadata.num_rows_per_source.size()) { + out_metadata.num_rows_per_source = + std::vector(out_metadata.num_rows_per_source.size(), 0); + } + // If this is previously non-empty, simply fill in zeros + else { + thrust::fill( + out_metadata.num_rows_per_source.begin(), out_metadata.num_rows_per_source.end(), 0); + } + } + + // Finalize output + return finalize_output(mode, out_metadata, out_columns); + } auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; @@ -585,11 +603,74 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) } } + // Compute number of rows per source if no AST filters + if (not _expr_conv.get_converted_expr().has_value()) { + // For chunked reading, compute the output number of rows per source + if (mode == read_mode::CHUNKED_READ) { + out_metadata.num_rows_per_source = + calculate_output_num_rows_per_source(read_info.skip_rows, read_info.num_rows); + } + // Simply move the number of rows per file if reading all at once + else { + // Move is okay here we are reading in one go. + out_metadata.num_rows_per_source = std::move(_file_itm_data.num_rows_per_source); + } + } + // Add empty columns if needed. Filter output columns based on filter. - return finalize_output(out_metadata, out_columns); + return finalize_output(mode, out_metadata, out_columns); +} + +std::vector reader::impl::calculate_output_num_rows_per_source(size_t const start_row, + size_t const num_rows) +{ + std::vector num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0); + + // TODO: Modify to simply end_row = start_row + num_rows once + // https://github.com/rapidsai/cudf/issues/16186 is fixed. + auto const end_row = std::min(_file_itm_data.global_skip_rows + _file_itm_data.global_num_rows, + start_row + num_rows); + CUDF_EXPECTS(start_row < end_row and + end_row <= _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows, + "Encountered invalid output chunk row bounds."); + + // Copy reference to a const local variable for better readability + auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source; + + // Binary search start_row and end_row in exclusive_sum_num_rows_per_source array + auto const start_iter = thrust::upper_bound( + partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row); + auto const end_iter = + (end_row == _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows) + ? partial_sum_nrows_source.cend() - 1 + : thrust::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row); + + // Compute the array offset index for both iterators + auto const start_idx = std::distance(partial_sum_nrows_source.cbegin(), start_iter); + auto const end_idx = std::distance(partial_sum_nrows_source.cbegin(), end_iter); + + CUDF_EXPECTS(start_idx <= end_idx, + "Encountered invalid source files indexes for output chunk row bounds"); + + // If the entire chunk is from the same source file, then the count is simply num_rows + if (start_idx == end_idx) { + num_rows_per_source[start_idx] = num_rows; + } else { + // Compute the number of rows from the first source file + num_rows_per_source[start_idx] = partial_sum_nrows_source[start_idx] - start_row; + // Compute the number of rows from the last source file + num_rows_per_source[end_idx] = end_row - partial_sum_nrows_source[end_idx - 1]; + // Simply copy the number of rows for each source in range: (start_idx, end_idx) + thrust::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1, + _file_itm_data.num_rows_per_source.cbegin() + end_idx, + num_rows_per_source.begin() + start_idx + 1); + } + + return num_rows_per_source; } -table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata, +table_with_metadata reader::impl::finalize_output(read_mode mode, + table_metadata& out_metadata, std::vector>& out_columns) { // Create empty columns as needed (this can happen if we've ended up with no actual data to read) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 3b8e80a29e6..c4565280957 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -262,11 +262,13 @@ class reader::impl { * @brief Finalize the output table by adding empty columns for the non-selected columns in * schema. * + * @param read_mode Value indicating if the data sources are read all at once or chunk by chunk * @param out_metadata The output table metadata * @param out_columns The columns for building the output table * @return The output table along with columns' metadata */ - table_with_metadata finalize_output(table_metadata& out_metadata, + table_with_metadata finalize_output(read_mode mode, + table_metadata& out_metadata, std::vector>& out_columns); /** @@ -336,11 +338,26 @@ class reader::impl { : true; } + /** + * @brief Check if this is the first output chunk + * + * @return True if this is the first output chunk + */ [[nodiscard]] bool is_first_output_chunk() const { return _file_itm_data._output_chunk_count == 0; } + /** + * @brief Calculate the number of rows read from each source in the output chunk + * + * @param start_row The offset of the first row in the output chunk + * @param num_rows The number of rows in the the output chunk + * @return Vector of number of rows from each respective data source in the output chunk + */ + [[nodiscard]] std::vector calculate_output_num_rows_per_source(size_t const start_row, + size_t const num_rows); + rmm::cuda_stream_view _stream; rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()}; @@ -387,7 +404,7 @@ class reader::impl { // chunked reading happens in 2 parts: // - // At the top level, the entire file is divided up into "passes" omn which we try and limit the + // At the top level, the entire file is divided up into "passes" on which we try and limit the // total amount of temporary memory (compressed data, decompressed data) in use // via _input_pass_read_limit. // diff --git a/cpp/src/io/parquet/reader_impl_chunking.hpp b/cpp/src/io/parquet/reader_impl_chunking.hpp index b959c793011..3a3cdd34a58 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.hpp +++ b/cpp/src/io/parquet/reader_impl_chunking.hpp @@ -41,6 +41,12 @@ struct file_intermediate_data { // is not capped by global_skip_rows and global_num_rows. std::vector input_pass_start_row_count{}; + // number of rows to be read from each data source + std::vector num_rows_per_source{}; + + // partial sum of the number of rows per data source + std::vector exclusive_sum_num_rows_per_source{}; + size_t _current_input_pass{0}; // current input pass index size_t _output_chunk_count{0}; // how many output chunks we have produced diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index ebd4affd099..ac3e0e5a89e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -952,7 +952,7 @@ std::vector aggregate_reader_metadata::get_pandas_index_names() con return names; } -std::tuple> +std::tuple, std::vector> aggregate_reader_metadata::select_row_groups( host_span const> row_group_indices, int64_t skip_rows_opt, @@ -983,6 +983,9 @@ aggregate_reader_metadata::select_row_groups( static_cast(from_opts.second)}; }(); + // Get number of rows in each data source + std::vector num_rows_per_source(per_file_metadata.size(), 0); + if (!row_group_indices.empty()) { CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(), "Must specify row groups for each source"); @@ -996,28 +999,40 @@ aggregate_reader_metadata::select_row_groups( selection.emplace_back(rowgroup_idx, rows_to_read, src_idx); // if page-level indexes are present, then collect extra chunk and page info. column_info_for_row_group(selection.back(), 0); - rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows; + auto const rows_this_rg = get_row_group(rowgroup_idx, src_idx).num_rows; + rows_to_read += rows_this_rg; + num_rows_per_source[src_idx] += rows_this_rg; } } } else { size_type count = 0; for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { auto const& fmd = per_file_metadata[src_idx]; - for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) { + for (size_t rg_idx = 0; + rg_idx < fmd.row_groups.size() and count < rows_to_skip + rows_to_read; + ++rg_idx) { auto const& rg = fmd.row_groups[rg_idx]; auto const chunk_start_row = count; count += rg.num_rows; if (count > rows_to_skip || count == 0) { + num_rows_per_source[src_idx] += count; + num_rows_per_source[src_idx] -= (chunk_start_row < rows_to_skip and count > rows_to_skip) + ? rows_to_skip + : chunk_start_row; + selection.emplace_back(rg_idx, chunk_start_row, src_idx); // if page-level indexes are present, then collect extra chunk and page info. column_info_for_row_group(selection.back(), chunk_start_row); } - if (count >= rows_to_skip + rows_to_read) { break; } + // Adjust the number of rows for the last source file. + if (count >= rows_to_skip + rows_to_read) { + num_rows_per_source[src_idx] -= count - rows_to_skip - rows_to_read; + } } } } - return {rows_to_skip, rows_to_read, std::move(selection)}; + return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)}; } std::tuple, diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 9aeb19a7723..fa970a5737b 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -279,17 +279,17 @@ class aggregate_reader_metadata { * @param output_column_schemas schema indices of output columns * @param filter Optional AST expression to filter row groups based on Column chunk statistics * @param stream CUDA stream used for device memory operations and kernel launches - * @return A tuple of corrected row_start, row_count and list of row group indexes and its - * starting row + * @return A tuple of corrected row_start, row_count, list of row group indexes and its + * starting row, and list of number of rows per source. */ - [[nodiscard]] std::tuple> select_row_groups( - host_span const> row_group_indices, - int64_t row_start, - std::optional const& row_count, - host_span output_dtypes, - host_span output_column_schemas, - std::optional> filter, - rmm::cuda_stream_view stream) const; + [[nodiscard]] std::tuple, std::vector> + select_row_groups(host_span const> row_group_indices, + int64_t row_start, + std::optional const& row_count, + host_span output_dtypes, + host_span output_column_schemas, + std::optional> filter, + rmm::cuda_stream_view stream) const; /** * @brief Filters and reduces down to a selection of columns diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index f28a7311ccb..ff47dfc4cf3 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1235,8 +1235,10 @@ void reader::impl::preprocess_file(read_mode mode) [](auto const& col) { return col.type; }); } - std::tie( - _file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) = + std::tie(_file_itm_data.global_skip_rows, + _file_itm_data.global_num_rows, + _file_itm_data.row_groups, + _file_itm_data.num_rows_per_source) = _metadata->select_row_groups(_options.row_group_indices, _options.skip_rows, _options.num_rows, @@ -1245,9 +1247,18 @@ void reader::impl::preprocess_file(read_mode mode) _expr_conv.get_converted_expr(), _stream); + // Inclusive scan the number of rows per source + if (not _expr_conv.get_converted_expr().has_value() and mode == read_mode::CHUNKED_READ) { + _file_itm_data.exclusive_sum_num_rows_per_source.resize( + _file_itm_data.num_rows_per_source.size()); + thrust::inclusive_scan(_file_itm_data.num_rows_per_source.cbegin(), + _file_itm_data.num_rows_per_source.cend(), + _file_itm_data.exclusive_sum_num_rows_per_source.begin()); + } + // check for page indexes - _has_page_index = std::all_of(_file_itm_data.row_groups.begin(), - _file_itm_data.row_groups.end(), + _has_page_index = std::all_of(_file_itm_data.row_groups.cbegin(), + _file_itm_data.row_groups.cend(), [](auto const& row_group) { return row_group.has_page_index(); }); if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && From 3891e3a9117bad4c5bab2f2cebe1ed3114dac56f Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 9 Jul 2024 00:54:00 +0000 Subject: [PATCH 02/22] Adding gtests for num_rows_per_source --- cpp/tests/io/parquet_chunked_reader_test.cu | 208 ++++++++++++++++++++ cpp/tests/io/parquet_reader_test.cpp | 150 ++++++++++++++ 2 files changed, 358 insertions(+) diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index cff85647725..cc5c7d6bc13 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1477,3 +1477,211 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadOutOfBoundChunks) CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *result); } } + +TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) +{ + constexpr int num_rows = 10'723; // A prime number + constexpr int rows_in_row_group = 500; + + // Table with single col of random int64 values + auto int64_data = random_values(num_rows); + auto int64_col = int64s_col(int64_data.begin(), int64_data.end()).release(); + + std::vector> input_columns; + input_columns.emplace_back(std::move(int64_col)); + + // Write to Parquet + auto const [expected, filepath] = write_file(input_columns, + "num_rows_per_source", + false, + false, + cudf::io::default_max_page_size_bytes, + rows_in_row_group); + + auto const read_table_and_nrows_per_source = [](cudf::io::chunked_parquet_reader const& reader) { + auto out_tables = std::vector>{}; + int num_chunks = 0; + auto nrows_per_source = std::vector{}; + // should always be true + EXPECT_EQ(reader.has_next(), true); + while (reader.has_next()) { + auto chunk = reader.read_chunk(); + out_tables.emplace_back(std::move(chunk.tbl)); + num_chunks++; + if (nrows_per_source.empty()) { + nrows_per_source = chunk.metadata.num_rows_per_source; + } else { + std::transform(chunk.metadata.num_rows_per_source.cbegin(), + chunk.metadata.num_rows_per_source.cend(), + nrows_per_source.begin(), + nrows_per_source.begin(), + std::plus()); + } + } + auto out_tviews = std::vector{}; + for (auto const& tbl : out_tables) { + out_tviews.emplace_back(tbl->view()); + } + + return std::tuple(cudf::concatenate(out_tviews), num_chunks, nrows_per_source); + }; + + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows + { + auto constexpr output_read_limit = 4'500; + auto constexpr pass_read_limit = 8'500; + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}).build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result->view()); + ASSERT_EQ(num_rows_per_source.size(), 1); + ASSERT_EQ(num_rows_per_source[0], num_rows); + } + + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read + { + // TODO: rows_to_skip not applicable until + // https://github.com/rapidsai/cudf/issues/16186 is fixed + auto const rows_to_skip = 0; + auto const rows_to_read = 7'232; + auto constexpr output_read_limit = 4'500; + auto constexpr pass_read_limit = 8'500; + + auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) + .skip_rows(rows_to_skip) + .num_rows(rows_to_read) + .build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + auto int64_col_selected = int64s_col(int64_data.begin() + rows_to_skip, + int64_data.begin() + rows_to_skip + rows_to_read) + .release(); + + cudf::table_view const expected_selected({int64_col_selected->view()}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); + ASSERT_EQ(num_rows_per_source.size(), 1); + ASSERT_EQ(num_rows_per_source[0], rows_to_read); + } + + // num_rows_per_source must be empty + { + auto const max_value = 100; + auto constexpr output_read_limit = 4'500; + auto constexpr pass_read_limit = 8'500; + auto literal_value = cudf::numeric_scalar{max_value}; + auto literal = cudf::ast::literal{literal_value}; + auto col_ref = cudf::ast::column_reference(0); + auto filter_expression = + cudf::ast::operation(cudf::ast::ast_operator::LESS_EQUAL, col_ref, literal); + + auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) + .filter(filter_expression) + .build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + std::vector int64_data_filtered; + int64_data_filtered.reserve(num_rows); + std::copy_if( + int64_data.begin(), int64_data.end(), std::back_inserter(int64_data_filtered), [=](auto val) { + return val <= max_value; + }); + + auto int64_col_filtered = + int64s_col(int64_data_filtered.begin(), int64_data_filtered.end()).release(); + + cudf::table_view expected_filtered({int64_col_filtered->view()}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result->view()); + ASSERT_EQ(num_rows_per_source.empty(), true); + } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + { + auto const nsources = 10; + auto constexpr output_read_limit = 25'000; + auto constexpr pass_read_limit = 45'000; + std::vector const datasources(nsources, filepath); + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}).build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + // Initialize expected_counts + std::vector const expected_counts(nsources, num_rows); + + ASSERT_EQ(num_rows_per_source.size(), nsources); + ASSERT_EQ( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), + true); + } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + { + // TODO: rows_to_skip not applicable until + // https://github.com/rapidsai/cudf/issues/16186 is fixed + auto const rows_to_skip = 0; + auto const rows_to_read = 47'232; + auto constexpr output_read_limit = 25'000; + auto constexpr pass_read_limit = 45'000; + + auto const nsources = 10; + std::vector const datasources(nsources, filepath); + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}) + .skip_rows(rows_to_skip) + .num_rows(rows_to_read) + .build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + // Initialize expected_counts + std::vector expected_counts(nsources, num_rows); + + // Adjust expected_counts for rows_to_skip + int64_t counter = 0; + for (auto& nrows : expected_counts) { + if (counter < rows_to_skip) { + counter += nrows; + nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; + } else { + break; + } + } + + // Reset the counter + counter = 0; + + // Adjust expected_counts for rows_to_read + for (auto& nrows : expected_counts) { + if (counter < rows_to_read) { + counter += nrows; + nrows = (counter >= rows_to_read) ? rows_to_read - counter + nrows : nrows; + } else if (counter > rows_to_read) { + nrows = 0; + } + } + + ASSERT_EQ(num_rows_per_source.size(), nsources); + ASSERT_EQ( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), + true); + } +} diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 2edf9e0aee6..4adb9be895e 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2243,6 +2243,156 @@ TEST_F(ParquetReaderTest, StringsWithPageStats) } } +TEST_F(ParquetReaderTest, NumRowsPerSource) +{ + int constexpr num_rows = 10'723; // A prime number + int constexpr rows_in_row_group = 500; + + // Table with single col of random int64 values + auto int64_data = random_values(num_rows); + column_wrapper int64_col{ + int64_data.begin(), int64_data.end(), cudf::test::iterators::no_nulls()}; + cudf::table_view expected({int64_col}); + + // Write to Parquet + auto const filepath = temp_env->get_temp_filepath("NumRowsPerSource.parquet"); + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .row_group_size_rows(rows_in_row_group) + .build(); + cudf::io::write_parquet(out_opts); + + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows + { + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); + auto const result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + ASSERT_EQ(result.metadata.num_rows_per_source.size(), 1); + ASSERT_EQ(result.metadata.num_rows_per_source[0], num_rows); + } + + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read + { + auto constexpr rows_to_skip = 557; // a prime number != rows_in_row_group + auto constexpr rows_to_read = 7'232; + auto const in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .skip_rows(rows_to_skip) + .num_rows(rows_to_read) + .build(); + auto const result = cudf::io::read_parquet(in_opts); + column_wrapper int64_col_selected{int64_data.begin() + rows_to_skip, + int64_data.begin() + rows_to_skip + rows_to_read, + cudf::test::iterators::no_nulls()}; + + cudf::table_view const expected_selected({int64_col_selected}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result.tbl->view()); + ASSERT_EQ(result.metadata.num_rows_per_source.size(), 1); + ASSERT_EQ(result.metadata.num_rows_per_source[0], rows_to_read); + } + + // num_rows_per_source must be empty + { + auto constexpr max_value = 100; + auto literal_value = cudf::numeric_scalar{max_value}; + auto literal = cudf::ast::literal{literal_value}; + auto col_ref = cudf::ast::column_reference(0); + auto filter_expression = + cudf::ast::operation(cudf::ast::ast_operator::LESS_EQUAL, col_ref, literal); + + auto const in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}) + .filter(filter_expression) + .build(); + + std::vector int64_data_filtered; + int64_data_filtered.reserve(num_rows); + std::copy_if( + int64_data.begin(), int64_data.end(), std::back_inserter(int64_data_filtered), [=](auto val) { + return val <= max_value; + }); + column_wrapper int64_col_filtered{ + int64_data_filtered.begin(), int64_data_filtered.end(), cudf::test::iterators::no_nulls()}; + + cudf::table_view expected_filtered({int64_col_filtered}); + + auto const result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result.tbl->view()); + ASSERT_EQ(result.metadata.num_rows_per_source.size(), 0); + } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + { + auto constexpr nsources = 10; + std::vector const datasources(nsources, filepath); + + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}).build(); + auto const result = cudf::io::read_parquet(in_opts); + + // Initialize expected_counts + std::vector const expected_counts(nsources, num_rows); + + ASSERT_EQ(result.metadata.num_rows_per_source.size(), nsources); + ASSERT_EQ(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin()), + true); + } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + { + auto constexpr rows_to_skip = 25'999; + auto constexpr rows_to_read = 47'232; + + auto constexpr nsources = 10; + std::vector const datasources(nsources, filepath); + + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}) + .skip_rows(rows_to_skip) + .num_rows(rows_to_read) + .build(); + + auto const result = cudf::io::read_parquet(in_opts); + + // Initialize expected_counts + std::vector expected_counts(nsources, num_rows); + + // Adjust expected_counts for rows_to_skip + int64_t counter = 0; + for (auto& nrows : expected_counts) { + if (counter < rows_to_skip) { + counter += nrows; + nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; + } else { + break; + } + } + + // Reset the counter + counter = 0; + + // Adjust expected_counts for rows_to_read + for (auto& nrows : expected_counts) { + if (counter < rows_to_read) { + counter += nrows; + nrows = (counter >= rows_to_read) ? rows_to_read - counter + nrows : nrows; + } else if (counter > rows_to_read) { + nrows = 0; + } + } + + ASSERT_EQ(result.metadata.num_rows_per_source.size(), nsources); + ASSERT_EQ(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin()), + true); + } +} + /////////////////// // metadata tests From d3863a669ae0151f5f0222de20626ae7cc3825c6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 9 Jul 2024 01:43:14 +0000 Subject: [PATCH 03/22] Minor updates --- cpp/tests/io/parquet_chunked_reader_test.cu | 26 ++++++++++----------- cpp/tests/io/parquet_reader_test.cpp | 18 +++++++------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index cc5c7d6bc13..119f7d9e832 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1502,14 +1502,12 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) auto out_tables = std::vector>{}; int num_chunks = 0; auto nrows_per_source = std::vector{}; - // should always be true - EXPECT_EQ(reader.has_next(), true); while (reader.has_next()) { auto chunk = reader.read_chunk(); out_tables.emplace_back(std::move(chunk.tbl)); num_chunks++; if (nrows_per_source.empty()) { - nrows_per_source = chunk.metadata.num_rows_per_source; + nrows_per_source = std::move(chunk.metadata.num_rows_per_source); } else { std::transform(chunk.metadata.num_rows_per_source.cbegin(), chunk.metadata.num_rows_per_source.cend(), @@ -1539,13 +1537,13 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); CUDF_TEST_EXPECT_TABLES_EQUAL(expected->view(), result->view()); - ASSERT_EQ(num_rows_per_source.size(), 1); - ASSERT_EQ(num_rows_per_source[0], num_rows); + EXPECT_EQ(num_rows_per_source.size(), 1); + EXPECT_EQ(num_rows_per_source[0], num_rows); } // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read { - // TODO: rows_to_skip not applicable until + // TODO: rows_to_skip not applicable for chunked read until // https://github.com/rapidsai/cudf/issues/16186 is fixed auto const rows_to_skip = 0; auto const rows_to_read = 7'232; @@ -1568,8 +1566,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) cudf::table_view const expected_selected({int64_col_selected->view()}); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); - ASSERT_EQ(num_rows_per_source.size(), 1); - ASSERT_EQ(num_rows_per_source[0], rows_to_read); + EXPECT_EQ(num_rows_per_source.size(), 1); + EXPECT_EQ(num_rows_per_source[0], rows_to_read); } // num_rows_per_source must be empty @@ -1604,7 +1602,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) cudf::table_view expected_filtered({int64_col_filtered->view()}); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result->view()); - ASSERT_EQ(num_rows_per_source.empty(), true); + EXPECT_EQ(num_rows_per_source.empty(), true); } // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows @@ -1624,15 +1622,15 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) // Initialize expected_counts std::vector const expected_counts(nsources, num_rows); - ASSERT_EQ(num_rows_per_source.size(), nsources); - ASSERT_EQ( + EXPECT_EQ(num_rows_per_source.size(), nsources); + EXPECT_EQ( std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), true); } // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows { - // TODO: rows_to_skip not applicable until + // TODO: rows_to_skip not applicable for chunked read until // https://github.com/rapidsai/cudf/issues/16186 is fixed auto const rows_to_skip = 0; auto const rows_to_read = 47'232; @@ -1679,8 +1677,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) } } - ASSERT_EQ(num_rows_per_source.size(), nsources); - ASSERT_EQ( + EXPECT_EQ(num_rows_per_source.size(), nsources); + EXPECT_EQ( std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), true); } diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 4adb9be895e..8c128eb1442 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2269,8 +2269,8 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) auto const result = cudf::io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); - ASSERT_EQ(result.metadata.num_rows_per_source.size(), 1); - ASSERT_EQ(result.metadata.num_rows_per_source[0], num_rows); + EXPECT_EQ(result.metadata.num_rows_per_source.size(), 1); + EXPECT_EQ(result.metadata.num_rows_per_source[0], num_rows); } // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read @@ -2289,8 +2289,8 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) cudf::table_view const expected_selected({int64_col_selected}); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result.tbl->view()); - ASSERT_EQ(result.metadata.num_rows_per_source.size(), 1); - ASSERT_EQ(result.metadata.num_rows_per_source[0], rows_to_read); + EXPECT_EQ(result.metadata.num_rows_per_source.size(), 1); + EXPECT_EQ(result.metadata.num_rows_per_source[0], rows_to_read); } // num_rows_per_source must be empty @@ -2320,7 +2320,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) auto const result = cudf::io::read_parquet(in_opts); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result.tbl->view()); - ASSERT_EQ(result.metadata.num_rows_per_source.size(), 0); + EXPECT_EQ(result.metadata.num_rows_per_source.size(), 0); } // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows @@ -2335,8 +2335,8 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) // Initialize expected_counts std::vector const expected_counts(nsources, num_rows); - ASSERT_EQ(result.metadata.num_rows_per_source.size(), nsources); - ASSERT_EQ(std::equal(expected_counts.cbegin(), + EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); + EXPECT_EQ(std::equal(expected_counts.cbegin(), expected_counts.cend(), result.metadata.num_rows_per_source.cbegin()), true); @@ -2385,8 +2385,8 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) } } - ASSERT_EQ(result.metadata.num_rows_per_source.size(), nsources); - ASSERT_EQ(std::equal(expected_counts.cbegin(), + EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); + EXPECT_EQ(std::equal(expected_counts.cbegin(), expected_counts.cend(), result.metadata.num_rows_per_source.cbegin()), true); From ebdfad557cb7b3546f5cb3f0b1e8e745c179b0e6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 9 Jul 2024 21:19:43 +0000 Subject: [PATCH 04/22] gtests for empty dfs and minor improvements --- cpp/include/cudf/io/types.hpp | 5 ++- cpp/src/io/parquet/reader_impl.cpp | 2 +- cpp/tests/io/parquet_chunked_reader_test.cu | 45 ++++++++++++++++++++- cpp/tests/io/parquet_reader_test.cpp | 38 +++++++++++++++-- 4 files changed, 82 insertions(+), 8 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 8cc48105934..431a5e7be83 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -277,8 +277,9 @@ struct column_name_info { struct table_metadata { std::vector schema_info; //!< Detailed name information for the entire output hierarchy - std::vector num_rows_per_source; //!< Number of rows read from each data source. Empty - //!< vector if AST filters are being used. + std::vector num_rows_per_source; //!< Number of rows read from each data source. + //!< Currently only computed for Parquet readers if no + //!< AST filters being used. Empty vector otherwise. std::map user_data; //!< Format-dependent metadata of the first input //!< file as key-values pairs (deprecated) std::vector> diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 19e003f7867..1e6bb5d1be0 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -556,7 +556,7 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) // If already empty (empty dataframe case), simply initialize to a list of zeros if (not out_metadata.num_rows_per_source.size()) { out_metadata.num_rows_per_source = - std::vector(out_metadata.num_rows_per_source.size(), 0); + std::vector(_file_itm_data.num_rows_per_source.size(), 0); } // If this is previously non-empty, simply fill in zeros else { diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 119f7d9e832..0583d4ccc9f 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1484,8 +1484,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) constexpr int rows_in_row_group = 500; // Table with single col of random int64 values - auto int64_data = random_values(num_rows); - auto int64_col = int64s_col(int64_data.begin(), int64_data.end()).release(); + auto const int64_data = random_values(num_rows); + auto int64_col = int64s_col(int64_data.begin(), int64_data.end()).release(); std::vector> input_columns; input_columns.emplace_back(std::move(int64_col)); @@ -1682,4 +1682,45 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), true); } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = 0 + { + auto constexpr output_read_limit = 4'500; + auto constexpr pass_read_limit = 8'500; + auto const nsources = 10; + + // Table with single col of random int64 values + auto int64_empty_col = int64s_col{}.release(); + + std::vector> input_empty_columns; + input_empty_columns.emplace_back(std::move(int64_empty_col)); + + // Write to Parquet + auto const [expected_empty, filepath_empty] = write_file(input_empty_columns, + "num_rows_per_source_empty", + false, + false, + cudf::io::default_max_page_size_bytes, + rows_in_row_group); + + std::vector const datasources(nsources, filepath_empty); + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}).build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + // Initialize expected_counts + std::vector const expected_counts(nsources, 0); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_empty->view(), result->view()); + + EXPECT_EQ(num_chunks, 1); + EXPECT_EQ(num_rows_per_source.size(), nsources); + EXPECT_EQ( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), + true); + } } diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 8c128eb1442..52f71b9c18d 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2249,10 +2249,10 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) int constexpr rows_in_row_group = 500; // Table with single col of random int64 values - auto int64_data = random_values(num_rows); - column_wrapper int64_col{ + auto const int64_data = random_values(num_rows); + column_wrapper const int64_col{ int64_data.begin(), int64_data.end(), cudf::test::iterators::no_nulls()}; - cudf::table_view expected({int64_col}); + cudf::table_view const expected({int64_col}); // Write to Parquet auto const filepath = temp_env->get_temp_filepath("NumRowsPerSource.parquet"); @@ -2391,6 +2391,38 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) result.metadata.num_rows_per_source.cbegin()), true); } + + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = 0 + { + auto const nsources = 10; + + column_wrapper const int64_empty_col{}; + cudf::table_view const expected_empty({int64_empty_col}); + + // Write to Parquet + auto const filepath_empty = temp_env->get_temp_filepath("NumRowsPerSourceEmpty.parquet"); + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath_empty}, expected_empty) + .row_group_size_rows(rows_in_row_group) + .build(); + cudf::io::write_parquet(out_opts); + + // Read from Parquet + std::vector const datasources(nsources, filepath_empty); + + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}).build(); + auto const result = cudf::io::read_parquet(in_opts); + + // Initialize expected_counts + std::vector const expected_counts(nsources, 0); + + EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); + EXPECT_EQ(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin()), + true); + } } /////////////////// From 0fd68905fdc34f62206d1da42cb62fd8b031ed5d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 9 Jul 2024 21:24:38 +0000 Subject: [PATCH 05/22] separate out the empty df gtest --- cpp/tests/io/parquet_reader_test.cpp | 49 ++++++++++++++-------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 52f71b9c18d..d80df327d25 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2391,38 +2391,37 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) result.metadata.num_rows_per_source.cbegin()), true); } +} - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = 0 - { - auto const nsources = 10; +TEST_F(ParquetReaderTest, NumRowsPerSourceEmptyTable) +{ + auto const nsources = 10; - column_wrapper const int64_empty_col{}; - cudf::table_view const expected_empty({int64_empty_col}); + column_wrapper const int64_empty_col{}; + cudf::table_view const expected_empty({int64_empty_col}); - // Write to Parquet - auto const filepath_empty = temp_env->get_temp_filepath("NumRowsPerSourceEmpty.parquet"); - auto const out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath_empty}, expected_empty) - .row_group_size_rows(rows_in_row_group) - .build(); - cudf::io::write_parquet(out_opts); + // Write to Parquet + auto const filepath_empty = temp_env->get_temp_filepath("NumRowsPerSourceEmpty.parquet"); + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath_empty}, expected_empty) + .build(); + cudf::io::write_parquet(out_opts); - // Read from Parquet - std::vector const datasources(nsources, filepath_empty); + // Read from Parquet + std::vector const datasources(nsources, filepath_empty); - auto const in_opts = - cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}).build(); - auto const result = cudf::io::read_parquet(in_opts); + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}).build(); + auto const result = cudf::io::read_parquet(in_opts); - // Initialize expected_counts - std::vector const expected_counts(nsources, 0); + // Initialize expected_counts + std::vector const expected_counts(nsources, 0); - EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); - EXPECT_EQ(std::equal(expected_counts.cbegin(), - expected_counts.cend(), - result.metadata.num_rows_per_source.cbegin()), - true); - } + EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); + EXPECT_EQ(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin()), + true); } /////////////////// From d268873dff9ffbb46dc3788dc2ed00807b875900 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 10 Jul 2024 21:25:26 +0000 Subject: [PATCH 06/22] Add num_rows_per_source vector to the types.pxd for future use in python when needed --- python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index 8d87deb1472..0a6bddcd907 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -81,6 +81,7 @@ cdef extern from "cudf/io/types.hpp" \ map[string, string] user_data vector[unordered_map[string, string]] per_file_user_data vector[column_name_info] schema_info + vector[size_t] num_rows_per_source cdef cppclass table_with_metadata: unique_ptr[table] tbl From 702b0ee794c53980a51598742816a1c6ad4d3c5a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 11 Jul 2024 00:12:30 +0000 Subject: [PATCH 07/22] Adjust for global_skip_rows while computing num_rows_per_source --- cpp/src/io/parquet/reader_impl.cpp | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 1e6bb5d1be0..f5d0a10a2ec 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -621,23 +621,22 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) return finalize_output(mode, out_metadata, out_columns); } -std::vector reader::impl::calculate_output_num_rows_per_source(size_t const start_row, - size_t const num_rows) +std::vector reader::impl::calculate_output_num_rows_per_source(size_t const chunk_start_row, + size_t const chunk_num_rows) { std::vector num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0); - // TODO: Modify to simply end_row = start_row + num_rows once - // https://github.com/rapidsai/cudf/issues/16186 is fixed. - auto const end_row = std::min(_file_itm_data.global_skip_rows + _file_itm_data.global_num_rows, - start_row + num_rows); - CUDF_EXPECTS(start_row < end_row and - end_row <= _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows, + // Subtract global skip rows from the start_row as we took care of that when computing + // _file_itm_data.num_rows_per_source + auto const start_row = chunk_start_row - _file_itm_data.global_skip_rows; + auto const end_row = start_row + chunk_num_rows; + CUDF_EXPECTS(start_row <= end_row and end_row <= _file_itm_data.global_num_rows, "Encountered invalid output chunk row bounds."); // Copy reference to a const local variable for better readability auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source; - // Binary search start_row and end_row in exclusive_sum_num_rows_per_source array + // Binary search start_row and end_row in exclusive_sum_num_rows_per_source vector auto const start_iter = thrust::upper_bound( partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row); auto const end_iter = @@ -654,7 +653,7 @@ std::vector reader::impl::calculate_output_num_rows_per_source(size_t co // If the entire chunk is from the same source file, then the count is simply num_rows if (start_idx == end_idx) { - num_rows_per_source[start_idx] = num_rows; + num_rows_per_source[start_idx] = chunk_num_rows; } else { // Compute the number of rows from the first source file num_rows_per_source[start_idx] = partial_sum_nrows_source[start_idx] - start_row; From d031af99de64dcd878ba918766de97c3d2b33d7b Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 11 Jul 2024 00:43:58 +0000 Subject: [PATCH 08/22] Fix segfault when skip_rows > 0 and num_passes > 1 in chunked_parquet_reader --- cpp/src/io/parquet/reader_impl_chunking.cu | 26 +++-- cpp/src/io/parquet/reader_impl_helpers.cpp | 15 +-- cpp/tests/io/parquet_chunked_reader_test.cu | 108 ++++++++++++++++---- 3 files changed, 119 insertions(+), 30 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index d371ef5de93..c61aa47f73a 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1245,8 +1245,10 @@ void reader::impl::setup_next_pass(read_mode mode) // everything we will be reading, regardless of what pass we are on. // num_rows is how many rows we are reading this pass. pass.skip_rows = - global_start_row + _file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; + + // global_start_row should only be added to the first input pass and not to subsequent ones. + pass.skip_rows += (_file_itm_data._current_input_pass == 0) ? global_start_row : 0; pass.num_rows = end_row - start_row; } @@ -1477,9 +1479,9 @@ void reader::impl::setup_next_subpass(read_mode mode) void reader::impl::create_global_chunk_info() { - auto const num_rows = _file_itm_data.global_num_rows; - auto const& row_groups_info = _file_itm_data.row_groups; - auto& chunks = _file_itm_data.chunks; + auto const num_rows = _file_itm_data.global_num_rows; + auto& row_groups_info = _file_itm_data.row_groups; + auto& chunks = _file_itm_data.chunks; // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = _input_columns.size(); @@ -1509,7 +1511,8 @@ void reader::impl::create_global_chunk_info() // Initialize column chunk information auto remaining_rows = num_rows; - for (auto const& rg : row_groups_info) { + auto skip_rows = _file_itm_data.global_skip_rows; + for (auto& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); auto const row_group_start = rg.start_row; auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); @@ -1561,7 +1564,18 @@ void reader::impl::create_global_chunk_info() schema.type == BYTE_ARRAY and _strings_to_categorical)); } - remaining_rows -= row_group_rows; + // Adjust for skip_rows when updating the remaining rows after the first group + remaining_rows -= + (skip_rows) ? std::min(rg.start_row + row_group.num_rows - skip_rows, remaining_rows) + : row_group_rows; + + // Adjust the start_row of the first row group which was left unadjusted during + // select_row_groups(). + if (skip_rows) { + rg.start_row = (skip_rows) ? skip_rows : rg.start_row; + // Set skip_rows = 0 as it is no longer needed for subsequent row_groups + skip_rows = 0; + } } } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index ac3e0e5a89e..ddf4c0395ef 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1015,14 +1015,17 @@ aggregate_reader_metadata::select_row_groups( auto const chunk_start_row = count; count += rg.num_rows; if (count > rows_to_skip || count == 0) { - num_rows_per_source[src_idx] += count; - num_rows_per_source[src_idx] -= (chunk_start_row < rows_to_skip and count > rows_to_skip) - ? rows_to_skip - : chunk_start_row; - + // start row of this row group adjusted with rows_to_skip + auto const chunk_start_row_this_rg = + (chunk_start_row <= rows_to_skip and count > rows_to_skip) ? rows_to_skip + : chunk_start_row; + num_rows_per_source[src_idx] += count - chunk_start_row_this_rg; + + // We need the unadjusted start index of this row group to correctly initialize + // ColumnChunkDesc for this row group in create_global_chunk_info(). selection.emplace_back(rg_idx, chunk_start_row, src_idx); // if page-level indexes are present, then collect extra chunk and page info. - column_info_for_row_group(selection.back(), chunk_start_row); + column_info_for_row_group(selection.back(), chunk_start_row_this_rg); } // Adjust the number of rows for the last source file. if (count >= rows_to_skip + rows_to_read) { diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 0583d4ccc9f..435a3b67a4b 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1526,8 +1526,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows { - auto constexpr output_read_limit = 4'500; - auto constexpr pass_read_limit = 8'500; + auto constexpr output_read_limit = 1'500; + auto constexpr pass_read_limit = 3'500; auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}).build(); @@ -1543,12 +1543,10 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read { - // TODO: rows_to_skip not applicable for chunked read until - // https://github.com/rapidsai/cudf/issues/16186 is fixed - auto const rows_to_skip = 0; + auto const rows_to_skip = 1'237; auto const rows_to_read = 7'232; - auto constexpr output_read_limit = 4'500; - auto constexpr pass_read_limit = 8'500; + auto constexpr output_read_limit = 1'500; + auto constexpr pass_read_limit = 3'500; auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) .skip_rows(rows_to_skip) @@ -1570,11 +1568,36 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[0], rows_to_read); } + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows - + // rows_to_read + { + auto const rows_to_skip = 1'237; + auto constexpr output_read_limit = 1'500; + auto constexpr pass_read_limit = 3'500; + + auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) + .skip_rows(rows_to_skip) + .build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + auto int64_col_selected = + int64s_col(int64_data.begin() + rows_to_skip, int64_data.end()).release(); + + cudf::table_view const expected_selected({int64_col_selected->view()}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); + EXPECT_EQ(num_rows_per_source.size(), 1); + EXPECT_EQ(num_rows_per_source[0], num_rows - rows_to_skip); + } + // num_rows_per_source must be empty { - auto const max_value = 100; - auto constexpr output_read_limit = 4'500; - auto constexpr pass_read_limit = 8'500; + int64_t const max_value = int64_data[int64_data.size() / 2]; + auto constexpr output_read_limit = 1'500; + auto constexpr pass_read_limit = 3'500; auto literal_value = cudf::numeric_scalar{max_value}; auto literal = cudf::ast::literal{literal_value}; auto col_ref = cudf::ast::column_reference(0); @@ -1608,8 +1631,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows { auto const nsources = 10; - auto constexpr output_read_limit = 25'000; - auto constexpr pass_read_limit = 45'000; + auto constexpr output_read_limit = 15'000; + auto constexpr pass_read_limit = 35'000; std::vector const datasources(nsources, filepath); auto const options = @@ -1628,14 +1651,12 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) true); } - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + // num_rows_per_source.size() must be = 10 { - // TODO: rows_to_skip not applicable for chunked read until - // https://github.com/rapidsai/cudf/issues/16186 is fixed - auto const rows_to_skip = 0; + auto const rows_to_skip = 5'571; auto const rows_to_read = 47'232; - auto constexpr output_read_limit = 25'000; - auto constexpr pass_read_limit = 45'000; + auto constexpr output_read_limit = 15'000; + auto constexpr pass_read_limit = 35'000; auto const nsources = 10; std::vector const datasources(nsources, filepath); @@ -1683,6 +1704,57 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) true); } + // num_rows_per_source.size() must be = 10 + { + auto const rows_to_skip = 5'571; + auto constexpr output_read_limit = 15'000; + auto constexpr pass_read_limit = 35'000; + + auto const nsources = 10; + std::vector const datasources(nsources, filepath); + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}) + .skip_rows(rows_to_skip) + .build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + // Initialize expected_counts + std::vector expected_counts(nsources, num_rows); + + // Adjust expected_counts for rows_to_skip + int64_t counter = 0; + for (auto& nrows : expected_counts) { + if (counter < rows_to_skip) { + counter += nrows; + nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; + } else { + break; + } + } + + // Reset the counter + counter = 0; + + // Adjust expected_counts for rows_to_read + for (auto& nrows : expected_counts) { + if (counter < num_rows * nsources) { + counter += nrows; + nrows = (counter >= num_rows * nsources) ? (num_rows * nsources) - counter + nrows : nrows; + } else if (counter > num_rows * nsources) { + nrows = 0; + } + } + + EXPECT_EQ(num_rows_per_source.size(), nsources); + EXPECT_EQ( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), + true); + } + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = 0 { auto constexpr output_read_limit = 4'500; From 1a207e981a90fa8ead37859742deb494ddb0f1c5 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 11 Jul 2024 00:52:56 +0000 Subject: [PATCH 09/22] Handle base cases when calculating num_rows_per_source --- cpp/src/io/parquet/reader_impl.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index f5d0a10a2ec..b8a55505bdf 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -624,6 +624,13 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) std::vector reader::impl::calculate_output_num_rows_per_source(size_t const chunk_start_row, size_t const chunk_num_rows) { + // Handle base cases. + if (_file_itm_data.num_rows_per_source.size() == 0) { + return {}; + } else if (_file_itm_data.num_rows_per_source.size() == 1) { + return {chunk_num_rows}; + } + std::vector num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0); // Subtract global skip rows from the start_row as we took care of that when computing @@ -672,7 +679,8 @@ table_with_metadata reader::impl::finalize_output(read_mode mode, table_metadata& out_metadata, std::vector>& out_columns) { - // Create empty columns as needed (this can happen if we've ended up with no actual data to read) + // Create empty columns as needed (this can happen if we've ended up with no actual data to + // read) for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) { if (!_output_metadata) { column_name_info& col_name = out_metadata.schema_info[i]; From 7ad6179ac8fadcff593ebc33481ce63a0266294e Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 11 Jul 2024 04:22:25 +0000 Subject: [PATCH 10/22] Add a couple more gtests --- cpp/src/io/parquet/reader_impl.cpp | 3 +- cpp/tests/io/parquet_chunked_reader_test.cu | 31 +++++++++++++++++++++ cpp/tests/io/parquet_reader_test.cpp | 25 +++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b8a55505bdf..7cacc4426dd 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -679,8 +679,7 @@ table_with_metadata reader::impl::finalize_output(read_mode mode, table_metadata& out_metadata, std::vector>& out_columns) { - // Create empty columns as needed (this can happen if we've ended up with no actual data to - // read) + // Create empty columns as needed (this can happen if we've ended up with no actual data to read) for (size_t i = out_columns.size(); i < _output_buffers.size(); ++i) { if (!_output_metadata) { column_name_info& col_name = out_metadata.schema_info[i]; diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 435a3b67a4b..425b576ced9 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1568,6 +1568,37 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[0], rows_to_read); } + // num_rows_per_source.size() must be = 2 + { + auto constexpr rows_to_skip = 15'723; + auto constexpr output_read_limit = 1'500; + auto constexpr pass_read_limit = 3'500; + + auto constexpr nsources = 2; + std::vector const datasources(nsources, filepath); + + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}) + .skip_rows(rows_to_skip) + .build(); + + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + + column_wrapper int64_col_selected{int64_data.begin() + rows_to_skip - num_rows, + int64_data.end(), + cudf::test::iterators::no_nulls()}; + + cudf::table_view const expected_selected({int64_col_selected}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); + EXPECT_EQ(num_rows_per_source.size(), 2); + EXPECT_EQ(num_rows_per_source[0], 0); + EXPECT_EQ(num_rows_per_source[1], nsources * num_rows - rows_to_skip); + } + // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows - // rows_to_read { diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index d80df327d25..36618a9f210 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2323,6 +2323,31 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source.size(), 0); } + // num_rows_per_source.size() must be = 2 + { + auto constexpr rows_to_skip = 15'723; + auto constexpr nsources = 2; + std::vector const datasources(nsources, filepath); + + auto const in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{datasources}) + .skip_rows(rows_to_skip) + .build(); + + auto const result = cudf::io::read_parquet(in_opts); + + column_wrapper int64_col_selected{int64_data.begin() + rows_to_skip - num_rows, + int64_data.end(), + cudf::test::iterators::no_nulls()}; + + cudf::table_view const expected_selected({int64_col_selected}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result.tbl->view()); + EXPECT_EQ(result.metadata.num_rows_per_source.size(), 2); + EXPECT_EQ(result.metadata.num_rows_per_source[0], 0); + EXPECT_EQ(result.metadata.num_rows_per_source[1], nsources * num_rows - rows_to_skip); + } + // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows { auto constexpr nsources = 10; From 975b7c3f3f4289ce90d21d345197e3a3bd953470 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 11 Jul 2024 11:23:58 -0700 Subject: [PATCH 11/22] Revert the chunk_start_row in column_info_for_row_group the page indexes rely on absolute row numbers, not adjusted for skip_rows Co-authored-by: Ed Seidl --- cpp/src/io/parquet/reader_impl_helpers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index a817a71d5e4..4a5866191e0 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1018,7 +1018,7 @@ aggregate_reader_metadata::select_row_groups( // ColumnChunkDesc for this row group in create_global_chunk_info(). selection.emplace_back(rg_idx, chunk_start_row, src_idx); // if page-level indexes are present, then collect extra chunk and page info. - column_info_for_row_group(selection.back(), chunk_start_row_this_rg); + column_info_for_row_group(selection.back(), chunk_start_row); } // Adjust the number of rows for the last source file. if (count >= rows_to_skip + rows_to_read) { From 0ac70b290527640fc7a9c0798c7b147cc92e0dc6 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Fri, 12 Jul 2024 02:21:17 +0000 Subject: [PATCH 12/22] Minor bug fix --- cpp/tests/io/parquet_chunked_reader_test.cu | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 425b576ced9..9ffcc13b059 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1587,11 +1587,10 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); - column_wrapper int64_col_selected{int64_data.begin() + rows_to_skip - num_rows, - int64_data.end(), - cudf::test::iterators::no_nulls()}; + auto int64_col_selected = + int64s_col(int64_data.begin() + rows_to_skip - num_rows, int64_data.end()).release(); - cudf::table_view const expected_selected({int64_col_selected}); + cudf::table_view const expected_selected({int64_col_selected->view()}); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); EXPECT_EQ(num_rows_per_source.size(), 2); From bcc3beccc7f2d617509555e767eaae5de4ea9228 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 16 Jul 2024 19:19:43 +0000 Subject: [PATCH 13/22] Remove the unreachable branch --- cpp/src/io/parquet/reader_impl.cpp | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 7cacc4426dd..6e0c3097700 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -553,16 +553,9 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) if (!has_more_work()) { // Compute number of rows per source if no AST filters if (not _expr_conv.get_converted_expr().has_value()) { - // If already empty (empty dataframe case), simply initialize to a list of zeros - if (not out_metadata.num_rows_per_source.size()) { - out_metadata.num_rows_per_source = - std::vector(_file_itm_data.num_rows_per_source.size(), 0); - } - // If this is previously non-empty, simply fill in zeros - else { - thrust::fill( - out_metadata.num_rows_per_source.begin(), out_metadata.num_rows_per_source.end(), 0); - } + // Empty dataframe case: Simply initialize to a list of zeros + out_metadata.num_rows_per_source = + std::vector(_file_itm_data.num_rows_per_source.size(), 0); } // Finalize output From 363b0da8dbc6facfa3feb228ef4bca68a9003b2b Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 16 Jul 2024 12:38:10 -0700 Subject: [PATCH 14/22] Applying suggestions Co-authored-by: Vukasin Milovanovic --- cpp/src/io/parquet/reader_impl.hpp | 4 ++-- cpp/tests/io/parquet_chunked_reader_test.cu | 5 ++--- cpp/tests/io/parquet_reader_test.cpp | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index c4565280957..f14a658e554 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -355,8 +355,8 @@ class reader::impl { * @param num_rows The number of rows in the the output chunk * @return Vector of number of rows from each respective data source in the output chunk */ - [[nodiscard]] std::vector calculate_output_num_rows_per_source(size_t const start_row, - size_t const num_rows); + [[nodiscard]] std::vector calculate_output_num_rows_per_source(size_t start_row, + size_t num_rows); rmm::cuda_stream_view _stream; rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()}; diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 9ffcc13b059..1b1c7aadb8f 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1821,8 +1821,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_chunks, 1); EXPECT_EQ(num_rows_per_source.size(), nsources); - EXPECT_EQ( - std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), - true); + EXPECT_TRUE( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } } diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 36618a9f210..7db934781b0 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2361,10 +2361,9 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) std::vector const expected_counts(nsources, num_rows); EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); - EXPECT_EQ(std::equal(expected_counts.cbegin(), + EXPECT_TRUE(std::equal(expected_counts.cbegin(), expected_counts.cend(), - result.metadata.num_rows_per_source.cbegin()), - true); + result.metadata.num_rows_per_source.cbegin())); } // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows From e239382e2211f6466803696bde7c6c9229c183c4 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Tue, 16 Jul 2024 20:27:57 +0000 Subject: [PATCH 15/22] Suggestions from commits --- cpp/src/io/parquet/reader_impl.hpp | 8 ++--- cpp/tests/io/parquet_chunked_reader_test.cu | 38 +++++++++------------ cpp/tests/io/parquet_reader_test.cpp | 30 ++++++++-------- 3 files changed, 35 insertions(+), 41 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index f14a658e554..5a3839142a2 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -351,12 +351,12 @@ class reader::impl { /** * @brief Calculate the number of rows read from each source in the output chunk * - * @param start_row The offset of the first row in the output chunk - * @param num_rows The number of rows in the the output chunk + * @param chunk_start_row The offset of the first row in the output chunk + * @param chunk_num_rows The number of rows in the the output chunk * @return Vector of number of rows from each respective data source in the output chunk */ - [[nodiscard]] std::vector calculate_output_num_rows_per_source(size_t start_row, - size_t num_rows); + [[nodiscard]] std::vector calculate_output_num_rows_per_source(size_t chunk_start_row, + size_t chunk_num_rows); rmm::cuda_stream_view _stream; rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()}; diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 1b1c7aadb8f..29a065bd690 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1524,7 +1524,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) return std::tuple(cudf::concatenate(out_tviews), num_chunks, nrows_per_source); }; - // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows + // Chunked-read single data source entirely { auto constexpr output_read_limit = 1'500; auto constexpr pass_read_limit = 3'500; @@ -1541,7 +1541,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[0], num_rows); } - // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read + // Chunked-read rows_to_read rows skipping rows_to_skip from single data source { auto const rows_to_skip = 1'237; auto const rows_to_read = 7'232; @@ -1568,7 +1568,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[0], rows_to_read); } - // num_rows_per_source.size() must be = 2 + // Chunked-read two data sources skipping the first entire file completely { auto constexpr rows_to_skip = 15'723; auto constexpr output_read_limit = 1'500; @@ -1598,8 +1598,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[1], nsources * num_rows - rows_to_skip); } - // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows - - // rows_to_read + // Chunked-read from single data source skipping rows_to_skip { auto const rows_to_skip = 1'237; auto constexpr output_read_limit = 1'500; @@ -1623,7 +1622,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) EXPECT_EQ(num_rows_per_source[0], num_rows - rows_to_skip); } - // num_rows_per_source must be empty + // Filtered chunked-read from single data source { int64_t const max_value = int64_data[int64_data.size() / 2]; auto constexpr output_read_limit = 1'500; @@ -1655,10 +1654,10 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) cudf::table_view expected_filtered({int64_col_filtered->view()}); CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result->view()); - EXPECT_EQ(num_rows_per_source.empty(), true); + EXPECT_TRUE(num_rows_per_source.empty()); } - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + // Chunked-read ten data sources entirely { auto const nsources = 10; auto constexpr output_read_limit = 15'000; @@ -1676,14 +1675,13 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) std::vector const expected_counts(nsources, num_rows); EXPECT_EQ(num_rows_per_source.size(), nsources); - EXPECT_EQ( - std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), - true); + EXPECT_TRUE( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } - // num_rows_per_source.size() must be = 10 + // Chunked-read rows_to_read rows skipping rows_to_skip from ten data sources { - auto const rows_to_skip = 5'571; + auto const rows_to_skip = 25'571; auto const rows_to_read = 47'232; auto constexpr output_read_limit = 15'000; auto constexpr pass_read_limit = 35'000; @@ -1729,12 +1727,11 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) } EXPECT_EQ(num_rows_per_source.size(), nsources); - EXPECT_EQ( - std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), - true); + EXPECT_TRUE( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } - // num_rows_per_source.size() must be = 10 + // Chunked-read ten data sources skipping rows_to_skip rows { auto const rows_to_skip = 5'571; auto constexpr output_read_limit = 15'000; @@ -1780,12 +1777,11 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) } EXPECT_EQ(num_rows_per_source.size(), nsources); - EXPECT_EQ( - std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin()), - true); + EXPECT_TRUE( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = 0 + // Chunked-read ten empty data sources { auto constexpr output_read_limit = 4'500; auto constexpr pass_read_limit = 8'500; diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index 7db934781b0..6c61535359f 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2262,7 +2262,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) .build(); cudf::io::write_parquet(out_opts); - // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = num_rows + // Read single data source entirely { auto const in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); @@ -2273,7 +2273,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source[0], num_rows); } - // num_rows_per_source.size() must be = 1 and num_rows_per_source[0] must be = rows_to_read + // Read rows_to_read rows skipping rows_to_skip from single data source { auto constexpr rows_to_skip = 557; // a prime number != rows_in_row_group auto constexpr rows_to_read = 7'232; @@ -2293,7 +2293,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source[0], rows_to_read); } - // num_rows_per_source must be empty + // Filtered read from single data source { auto constexpr max_value = 100; auto literal_value = cudf::numeric_scalar{max_value}; @@ -2323,7 +2323,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source.size(), 0); } - // num_rows_per_source.size() must be = 2 + // Read two data sources skipping the first entire file completely { auto constexpr rows_to_skip = 15'723; auto constexpr nsources = 2; @@ -2348,7 +2348,7 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source[1], nsources * num_rows - rows_to_skip); } - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + // Read ten data sources entirely { auto constexpr nsources = 10; std::vector const datasources(nsources, filepath); @@ -2362,11 +2362,11 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); EXPECT_TRUE(std::equal(expected_counts.cbegin(), - expected_counts.cend(), - result.metadata.num_rows_per_source.cbegin())); + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin())); } - // num_rows_per_source.size() must be = 10 and all num_rows_per_source[k] must be = num_rows + // Read rows_to_read rows skipping rows_to_skip (> two sources) from ten data sources { auto constexpr rows_to_skip = 25'999; auto constexpr rows_to_read = 47'232; @@ -2410,10 +2410,9 @@ TEST_F(ParquetReaderTest, NumRowsPerSource) } EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); - EXPECT_EQ(std::equal(expected_counts.cbegin(), - expected_counts.cend(), - result.metadata.num_rows_per_source.cbegin()), - true); + EXPECT_TRUE(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin())); } } @@ -2442,10 +2441,9 @@ TEST_F(ParquetReaderTest, NumRowsPerSourceEmptyTable) std::vector const expected_counts(nsources, 0); EXPECT_EQ(result.metadata.num_rows_per_source.size(), nsources); - EXPECT_EQ(std::equal(expected_counts.cbegin(), - expected_counts.cend(), - result.metadata.num_rows_per_source.cbegin()), - true); + EXPECT_TRUE(std::equal(expected_counts.cbegin(), + expected_counts.cend(), + result.metadata.num_rows_per_source.cbegin())); } /////////////////// From 6f7d203f4cdc7a5a7ba925ca5407f45e2f713ecd Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 17 Jul 2024 03:11:13 +0000 Subject: [PATCH 16/22] Finally fixed the segfault when skip_rows > row group size --- cpp/src/io/parquet/reader_impl_chunking.cu | 46 ++--- cpp/src/io/parquet/reader_impl_helpers.cpp | 7 +- cpp/tests/io/parquet_chunked_reader_test.cu | 190 +++++++++++++------- 3 files changed, 153 insertions(+), 90 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 8464d7ca180..038c58076b8 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1233,22 +1233,18 @@ void reader::impl::setup_next_pass(read_mode mode) pass.num_rows = _file_itm_data.global_num_rows; } else { auto const global_start_row = _file_itm_data.global_skip_rows; - auto const global_end_row = global_start_row + _file_itm_data.global_num_rows; + auto const global_num_rows = _file_itm_data.global_num_rows; auto const start_row = - std::max(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass], - global_start_row); + _file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; auto const end_row = std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1], - global_end_row); + global_num_rows); // skip_rows is always global in the sense that it is relative to the first row of // everything we will be reading, regardless of what pass we are on. // num_rows is how many rows we are reading this pass. - pass.skip_rows = - _file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; + pass.skip_rows = global_start_row + start_row; - // global_start_row should only be added to the first input pass and not to subsequent ones. - pass.skip_rows += (_file_itm_data._current_input_pass == 0) ? global_start_row : 0; pass.num_rows = end_row - start_row; } @@ -1568,14 +1564,8 @@ void reader::impl::create_global_chunk_info() remaining_rows -= (skip_rows) ? std::min(rg.start_row + row_group.num_rows - skip_rows, remaining_rows) : row_group_rows; - - // Adjust the start_row of the first row group which was left unadjusted during - // select_row_groups(). - if (skip_rows) { - rg.start_row = (skip_rows) ? skip_rows : rg.start_row; - // Set skip_rows = 0 as it is no longer needed for subsequent row_groups - skip_rows = 0; - } + // Set skip_rows = 0 as it is no longer needed for subsequent row_groups + skip_rows = 0; } } @@ -1583,7 +1573,7 @@ void reader::impl::compute_input_passes() { // at this point, row_groups has already been filtered down to just the row groups we need to // handle optional skip_rows/num_rows parameters. - auto const& row_groups_info = _file_itm_data.row_groups; + auto& row_groups_info = _file_itm_data.row_groups; // if the user hasn't specified an input size limit, read everything in a single pass. if (_input_pass_read_limit == 0) { @@ -1612,14 +1602,30 @@ void reader::impl::compute_input_passes() _file_itm_data.input_pass_row_group_offsets.push_back(0); _file_itm_data.input_pass_start_row_count.push_back(0); + // To handle global_skip_rows when computing input passes + int skip_rows = _file_itm_data.global_skip_rows; + for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { - auto const& rgi = row_groups_info[cur_rg_index]; + auto& rgi = row_groups_info[cur_rg_index]; auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); // total compressed size and total size (compressed + uncompressed) for auto const [compressed_rg_size, _ /*compressed + uncompressed*/] = get_row_group_size(row_group); + // We must use the effective size of the first row group we are reading to accurately calculate + // the first non-zero input_pass_start_row_count. + auto row_group_rows = + (skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows; + + // Adjust the start_row of the first row group which was left unadjusted during + // select_row_groups(). + if (skip_rows) { + rgi.start_row = skip_rows; + // Set skip_rows = 0 as it is no longer needed for subsequent row_groups + skip_rows = 0; + } + // can we add this row group if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) { // A single row group (the current one) is larger than the read limit: @@ -1627,7 +1633,7 @@ void reader::impl::compute_input_passes() // row group if (cur_rg_start == cur_rg_index) { _file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1); - _file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group.num_rows); + _file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows); cur_rg_start = cur_rg_index + 1; cur_pass_byte_size = 0; } @@ -1641,7 +1647,7 @@ void reader::impl::compute_input_passes() } else { cur_pass_byte_size += compressed_rg_size; } - cur_row_count += row_group.num_rows; + cur_row_count += row_group_rows; } // add the last pass if necessary diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 4a5866191e0..83105a93d18 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1015,9 +1015,12 @@ aggregate_reader_metadata::select_row_groups( num_rows_per_source[src_idx] += count - chunk_start_row_this_rg; // We need the unadjusted start index of this row group to correctly initialize - // ColumnChunkDesc for this row group in create_global_chunk_info(). + // ColumnChunkDesc for this row group in create_global_chunk_info() and correctly + // calculate the row offset for the first pass in compute_input_passes(). selection.emplace_back(rg_idx, chunk_start_row, src_idx); - // if page-level indexes are present, then collect extra chunk and page info. + + // If page-level indexes are present, then collect extra chunk and page info. + // The page indexes rely on absolute row numbers, not adjusted for skip_rows. column_info_for_row_group(selection.back(), chunk_start_row); } // Adjust the number of rows for the last source file. diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 29a065bd690..1e967cdce98 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -149,6 +149,33 @@ auto chunked_read(std::string const& filepath, return chunked_read(vpath, output_limit, input_limit); } +auto const read_table_and_nrows_per_source(cudf::io::chunked_parquet_reader const& reader) +{ + auto out_tables = std::vector>{}; + int num_chunks = 0; + auto nrows_per_source = std::vector{}; + while (reader.has_next()) { + auto chunk = reader.read_chunk(); + out_tables.emplace_back(std::move(chunk.tbl)); + num_chunks++; + if (nrows_per_source.empty()) { + nrows_per_source = std::move(chunk.metadata.num_rows_per_source); + } else { + std::transform(chunk.metadata.num_rows_per_source.cbegin(), + chunk.metadata.num_rows_per_source.cend(), + nrows_per_source.begin(), + nrows_per_source.begin(), + std::plus()); + } + } + auto out_tviews = std::vector{}; + for (auto const& tbl : out_tables) { + out_tviews.emplace_back(tbl->view()); + } + + return std::tuple(cudf::concatenate(out_tviews), num_chunks, nrows_per_source); +} + } // namespace struct ParquetChunkedReaderTest : public cudf::test::BaseFixture {}; @@ -1498,32 +1525,6 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) cudf::io::default_max_page_size_bytes, rows_in_row_group); - auto const read_table_and_nrows_per_source = [](cudf::io::chunked_parquet_reader const& reader) { - auto out_tables = std::vector>{}; - int num_chunks = 0; - auto nrows_per_source = std::vector{}; - while (reader.has_next()) { - auto chunk = reader.read_chunk(); - out_tables.emplace_back(std::move(chunk.tbl)); - num_chunks++; - if (nrows_per_source.empty()) { - nrows_per_source = std::move(chunk.metadata.num_rows_per_source); - } else { - std::transform(chunk.metadata.num_rows_per_source.cbegin(), - chunk.metadata.num_rows_per_source.cend(), - nrows_per_source.begin(), - nrows_per_source.begin(), - std::plus()); - } - } - auto out_tviews = std::vector{}; - for (auto const& tbl : out_tables) { - out_tviews.emplace_back(tbl->view()); - } - - return std::tuple(cudf::concatenate(out_tviews), num_chunks, nrows_per_source); - }; - // Chunked-read single data source entirely { auto constexpr output_read_limit = 1'500; @@ -1571,8 +1572,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) // Chunked-read two data sources skipping the first entire file completely { auto constexpr rows_to_skip = 15'723; - auto constexpr output_read_limit = 1'500; - auto constexpr pass_read_limit = 3'500; + auto constexpr output_read_limit = 1'024'000; + auto constexpr pass_read_limit = 1'024'000; auto constexpr nsources = 2; std::vector const datasources(nsources, filepath); @@ -1602,7 +1603,7 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) { auto const rows_to_skip = 1'237; auto constexpr output_read_limit = 1'500; - auto constexpr pass_read_limit = 3'500; + auto constexpr pass_read_limit = 1'800; auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{filepath}) .skip_rows(rows_to_skip) @@ -1656,10 +1657,31 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) CUDF_TEST_EXPECT_TABLES_EQUAL(expected_filtered, result->view()); EXPECT_TRUE(num_rows_per_source.empty()); } +} + +TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSourceMultipleSources) +{ + constexpr int num_rows = 10'723; // A prime number + constexpr int rows_in_row_group = 500; + + // Table with single col of random int64 values + auto const int64_data = random_values(num_rows); + auto int64_col = int64s_col(int64_data.begin(), int64_data.end()).release(); + + std::vector> input_columns; + input_columns.emplace_back(std::move(int64_col)); - // Chunked-read ten data sources entirely + // Write to Parquet + auto const [expected, filepath] = write_file(input_columns, + "num_rows_per_source", + false, + false, + cudf::io::default_max_page_size_bytes, + rows_in_row_group); + + // Chunked-read six data sources entirely { - auto const nsources = 10; + auto const nsources = 6; auto constexpr output_read_limit = 15'000; auto constexpr pass_read_limit = 35'000; std::vector const datasources(nsources, filepath); @@ -1679,14 +1701,23 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } - // Chunked-read rows_to_read rows skipping rows_to_skip from ten data sources + // Chunked-read rows_to_read rows skipping rows_to_skip from eight data sources { auto const rows_to_skip = 25'571; - auto const rows_to_read = 47'232; + auto const rows_to_read = 41'232; auto constexpr output_read_limit = 15'000; auto constexpr pass_read_limit = 35'000; + auto const nsources = 8; + std::vector int64_selected_data{}; + int64_selected_data.reserve(nsources * num_rows); + + std::for_each( + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(nsources), + [&](auto const i) { + std::copy(int64_data.begin(), int64_data.end(), std::back_inserter(int64_selected_data)); + }); - auto const nsources = 10; std::vector const datasources(nsources, filepath); auto const options = @@ -1726,20 +1757,36 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) } } + // Initialize expected table + auto int64_col_selected = int64s_col(int64_selected_data.begin() + rows_to_skip, + int64_selected_data.begin() + +rows_to_skip + rows_to_read) + .release(); + + cudf::table_view const expected_selected({int64_col_selected->view()}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); EXPECT_EQ(num_rows_per_source.size(), nsources); EXPECT_TRUE( std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } - // Chunked-read ten data sources skipping rows_to_skip rows + // Chunked-read four data sources skipping three files completely { - auto const rows_to_skip = 5'571; + auto const nsources = 4; + int constexpr rows_to_skip = num_rows * 3 + 1; auto constexpr output_read_limit = 15'000; auto constexpr pass_read_limit = 35'000; + std::vector int64_selected_data{}; + int64_selected_data.reserve(nsources * num_rows); + + std::for_each( + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(nsources), + [&](auto const i) { + std::copy(int64_data.begin(), int64_data.end(), std::back_inserter(int64_selected_data)); + }); - auto const nsources = 10; std::vector const datasources(nsources, filepath); - auto const options = cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}) .skip_rows(rows_to_skip) @@ -1776,48 +1823,55 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSource) } } + // Initialize expected table + auto int64_col_selected = + int64s_col(int64_selected_data.begin() + rows_to_skip, int64_selected_data.end()).release(); + + cudf::table_view const expected_selected({int64_col_selected->view()}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_selected, result->view()); EXPECT_EQ(num_rows_per_source.size(), nsources); EXPECT_TRUE( std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } +} - // Chunked-read ten empty data sources - { - auto constexpr output_read_limit = 4'500; - auto constexpr pass_read_limit = 8'500; - auto const nsources = 10; +TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSourceEmptyTable) +{ + auto constexpr output_read_limit = 4'500; + auto constexpr pass_read_limit = 8'500; + auto const nsources = 10; - // Table with single col of random int64 values - auto int64_empty_col = int64s_col{}.release(); + // Table with single col of random int64 values + auto int64_empty_col = int64s_col{}.release(); - std::vector> input_empty_columns; - input_empty_columns.emplace_back(std::move(int64_empty_col)); + std::vector> input_empty_columns; + input_empty_columns.emplace_back(std::move(int64_empty_col)); - // Write to Parquet - auto const [expected_empty, filepath_empty] = write_file(input_empty_columns, - "num_rows_per_source_empty", - false, - false, - cudf::io::default_max_page_size_bytes, - rows_in_row_group); + // Write to Parquet + auto const [expected_empty, filepath_empty] = write_file(input_empty_columns, + "num_rows_per_source_empty", + false, + false, + cudf::io::default_max_page_size_bytes, + 500); - std::vector const datasources(nsources, filepath_empty); + std::vector const datasources(nsources, filepath_empty); - auto const options = - cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}).build(); - auto const reader = cudf::io::chunked_parquet_reader( - output_read_limit, pass_read_limit, options, cudf::get_default_stream()); + auto const options = + cudf::io::parquet_reader_options_builder(cudf::io::source_info{datasources}).build(); + auto const reader = cudf::io::chunked_parquet_reader( + output_read_limit, pass_read_limit, options, cudf::get_default_stream()); - auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); + auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); - // Initialize expected_counts - std::vector const expected_counts(nsources, 0); + // Initialize expected_counts + std::vector const expected_counts(nsources, 0); - CUDF_TEST_EXPECT_TABLES_EQUAL(expected_empty->view(), result->view()); + CUDF_TEST_EXPECT_TABLES_EQUAL(expected_empty->view(), result->view()); - EXPECT_EQ(num_chunks, 1); - EXPECT_EQ(num_rows_per_source.size(), nsources); - EXPECT_TRUE( - std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); - } + EXPECT_EQ(num_chunks, 1); + EXPECT_EQ(num_rows_per_source.size(), nsources); + EXPECT_TRUE( + std::equal(expected_counts.cbegin(), expected_counts.cend(), num_rows_per_source.cbegin())); } From c19e972f470049157f76d0de75866e74df5b8c9b Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 17 Jul 2024 03:17:53 +0000 Subject: [PATCH 17/22] Revert const to row_group_info as no longer needed --- cpp/src/io/parquet/reader_impl_chunking.cu | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 038c58076b8..6209632ec80 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1475,9 +1475,9 @@ void reader::impl::setup_next_subpass(read_mode mode) void reader::impl::create_global_chunk_info() { - auto const num_rows = _file_itm_data.global_num_rows; - auto& row_groups_info = _file_itm_data.row_groups; - auto& chunks = _file_itm_data.chunks; + auto const num_rows = _file_itm_data.global_num_rows; + auto const& row_groups_info = _file_itm_data.row_groups; + auto& chunks = _file_itm_data.chunks; // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = _input_columns.size(); @@ -1508,7 +1508,7 @@ void reader::impl::create_global_chunk_info() // Initialize column chunk information auto remaining_rows = num_rows; auto skip_rows = _file_itm_data.global_skip_rows; - for (auto& rg : row_groups_info) { + for (auto const& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); auto const row_group_start = rg.start_row; auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); From 513d3bbf4474bee1d18e9f8b1a4c3072d6081f4a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 17 Jul 2024 21:07:36 +0000 Subject: [PATCH 18/22] Fix the free() invalid pointer at chunked reader destructor --- cpp/src/io/parquet/reader_impl_chunking.cu | 13 ++++--------- cpp/src/io/parquet/reader_impl_helpers.cpp | 4 ++-- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 6209632ec80..62758c53586 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1573,7 +1573,7 @@ void reader::impl::compute_input_passes() { // at this point, row_groups has already been filtered down to just the row groups we need to // handle optional skip_rows/num_rows parameters. - auto& row_groups_info = _file_itm_data.row_groups; + auto const& row_groups_info = _file_itm_data.row_groups; // if the user hasn't specified an input size limit, read everything in a single pass. if (_input_pass_read_limit == 0) { @@ -1606,7 +1606,7 @@ void reader::impl::compute_input_passes() int skip_rows = _file_itm_data.global_skip_rows; for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { - auto& rgi = row_groups_info[cur_rg_index]; + auto const& rgi = row_groups_info[cur_rg_index]; auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); // total compressed size and total size (compressed + uncompressed) for @@ -1618,13 +1618,8 @@ void reader::impl::compute_input_passes() auto row_group_rows = (skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows; - // Adjust the start_row of the first row group which was left unadjusted during - // select_row_groups(). - if (skip_rows) { - rgi.start_row = skip_rows; - // Set skip_rows = 0 as it is no longer needed for subsequent row_groups - skip_rows = 0; - } + // Set skip_rows = 0 as it is no longer needed for subsequent row_groups + skip_rows = 0; // can we add this row group if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 83105a93d18..598ac6789b2 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1015,8 +1015,8 @@ aggregate_reader_metadata::select_row_groups( num_rows_per_source[src_idx] += count - chunk_start_row_this_rg; // We need the unadjusted start index of this row group to correctly initialize - // ColumnChunkDesc for this row group in create_global_chunk_info() and correctly - // calculate the row offset for the first pass in compute_input_passes(). + // ColumnChunkDesc for this row group in create_global_chunk_info() and calculate + // the row offset for the first pass in compute_input_passes(). selection.emplace_back(rg_idx, chunk_start_row, src_idx); // If page-level indexes are present, then collect extra chunk and page info. From c11b27d7ba3669d623c7b204d18c844707bfc743 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 17 Jul 2024 21:13:49 +0000 Subject: [PATCH 19/22] Minor code cleanup --- cpp/src/io/parquet/reader_impl_chunking.cu | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 62758c53586..130e7cdbd36 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1232,20 +1232,17 @@ void reader::impl::setup_next_pass(read_mode mode) pass.skip_rows = _file_itm_data.global_skip_rows; pass.num_rows = _file_itm_data.global_num_rows; } else { - auto const global_start_row = _file_itm_data.global_skip_rows; - auto const global_num_rows = _file_itm_data.global_num_rows; auto const start_row = _file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; auto const end_row = std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1], - global_num_rows); + _file_itm_data.global_num_rows); // skip_rows is always global in the sense that it is relative to the first row of // everything we will be reading, regardless of what pass we are on. // num_rows is how many rows we are reading this pass. - pass.skip_rows = global_start_row + start_row; - - pass.num_rows = end_row - start_row; + pass.skip_rows = _file_itm_data.global_skip_rows + start_row; + pass.num_rows = end_row - start_row; } // load page information for the chunk. this retrieves the compressed bytes for all the From 6ae6f078d556f18b7b1eb6ae88fbfdfa8c884d8d Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 18 Jul 2024 17:56:27 +0000 Subject: [PATCH 20/22] Minor code improvements. --- cpp/src/io/parquet/reader_impl_chunking.cu | 2 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 8 +- cpp/tests/io/parquet_chunked_reader_test.cu | 87 +++++++++------------ 3 files changed, 42 insertions(+), 55 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 130e7cdbd36..30f2a038264 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1612,7 +1612,7 @@ void reader::impl::compute_input_passes() // We must use the effective size of the first row group we are reading to accurately calculate // the first non-zero input_pass_start_row_count. - auto row_group_rows = + auto const row_group_rows = (skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows; // Set skip_rows = 0 as it is no longer needed for subsequent row_groups diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 598ac6789b2..61405708f01 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1009,10 +1009,10 @@ aggregate_reader_metadata::select_row_groups( count += rg.num_rows; if (count > rows_to_skip || count == 0) { // start row of this row group adjusted with rows_to_skip - auto const chunk_start_row_this_rg = - (chunk_start_row <= rows_to_skip and count > rows_to_skip) ? rows_to_skip - : chunk_start_row; - num_rows_per_source[src_idx] += count - chunk_start_row_this_rg; + num_rows_per_source[src_idx] += count; + num_rows_per_source[src_idx] -= (chunk_start_row <= rows_to_skip and count > rows_to_skip) + ? rows_to_skip + : chunk_start_row; // We need the unadjusted start index of this row group to correctly initialize // ColumnChunkDesc for this row group in create_global_chunk_info() and calculate diff --git a/cpp/tests/io/parquet_chunked_reader_test.cu b/cpp/tests/io/parquet_chunked_reader_test.cu index 1e967cdce98..2917852235c 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cu +++ b/cpp/tests/io/parquet_chunked_reader_test.cu @@ -1679,6 +1679,39 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSourceMultipleSources) cudf::io::default_max_page_size_bytes, rows_in_row_group); + // Function to initialize a vector of expected counts per source + auto initialize_expected_counts = + [](int const nsources, int const num_rows, int const rows_to_skip, int const rows_to_read) { + // Initialize expected_counts + std::vector expected_counts(nsources, num_rows); + + // Adjust expected_counts for rows_to_skip + int64_t counter = 0; + for (auto& nrows : expected_counts) { + if (counter < rows_to_skip) { + counter += nrows; + nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; + } else { + break; + } + } + + // Reset the counter + counter = 0; + + // Adjust expected_counts for rows_to_read + for (auto& nrows : expected_counts) { + if (counter < rows_to_read) { + counter += nrows; + nrows = (counter >= rows_to_read) ? rows_to_read - counter + nrows : nrows; + } else if (counter > rows_to_read) { + nrows = 0; + } + } + + return expected_counts; + }; + // Chunked-read six data sources entirely { auto const nsources = 6; @@ -1731,31 +1764,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSourceMultipleSources) auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); // Initialize expected_counts - std::vector expected_counts(nsources, num_rows); - - // Adjust expected_counts for rows_to_skip - int64_t counter = 0; - for (auto& nrows : expected_counts) { - if (counter < rows_to_skip) { - counter += nrows; - nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; - } else { - break; - } - } - - // Reset the counter - counter = 0; - - // Adjust expected_counts for rows_to_read - for (auto& nrows : expected_counts) { - if (counter < rows_to_read) { - counter += nrows; - nrows = (counter >= rows_to_read) ? rows_to_read - counter + nrows : nrows; - } else if (counter > rows_to_read) { - nrows = 0; - } - } + auto const expected_counts = + initialize_expected_counts(nsources, num_rows, rows_to_skip, rows_to_read); // Initialize expected table auto int64_col_selected = int64s_col(int64_selected_data.begin() + rows_to_skip, @@ -1797,31 +1807,8 @@ TEST_F(ParquetChunkedReaderTest, TestNumRowsPerSourceMultipleSources) auto const [result, num_chunks, num_rows_per_source] = read_table_and_nrows_per_source(reader); // Initialize expected_counts - std::vector expected_counts(nsources, num_rows); - - // Adjust expected_counts for rows_to_skip - int64_t counter = 0; - for (auto& nrows : expected_counts) { - if (counter < rows_to_skip) { - counter += nrows; - nrows = (counter >= rows_to_skip) ? counter - rows_to_skip : 0; - } else { - break; - } - } - - // Reset the counter - counter = 0; - - // Adjust expected_counts for rows_to_read - for (auto& nrows : expected_counts) { - if (counter < num_rows * nsources) { - counter += nrows; - nrows = (counter >= num_rows * nsources) ? (num_rows * nsources) - counter + nrows : nrows; - } else if (counter > num_rows * nsources) { - nrows = 0; - } - } + auto const expected_counts = + initialize_expected_counts(nsources, num_rows, rows_to_skip, num_rows * nsources); // Initialize expected table auto int64_col_selected = From 91e47353fa5ab6acf6ce1c55be71ad425a60f726 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Thu, 18 Jul 2024 23:33:37 +0000 Subject: [PATCH 21/22] Add helper function for include_output_num_rows_per_source --- cpp/src/io/parquet/reader_impl.cpp | 10 +++++----- cpp/src/io/parquet/reader_impl.hpp | 10 ++++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 6e0c3097700..8721ceddc48 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -551,8 +551,8 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) // no work to do (this can happen on the first pass if we have no rows to read) if (!has_more_work()) { - // Compute number of rows per source if no AST filters - if (not _expr_conv.get_converted_expr().has_value()) { + // Check if number of rows per source should be included in output metadata. + if (include_output_num_rows_per_source()) { // Empty dataframe case: Simply initialize to a list of zeros out_metadata.num_rows_per_source = std::vector(_file_itm_data.num_rows_per_source.size(), 0); @@ -596,8 +596,8 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) } } - // Compute number of rows per source if no AST filters - if (not _expr_conv.get_converted_expr().has_value()) { + // Check if number of rows per source should be included in output metadata. + if (include_output_num_rows_per_source()) { // For chunked reading, compute the output number of rows per source if (mode == read_mode::CHUNKED_READ) { out_metadata.num_rows_per_source = @@ -605,7 +605,7 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode) } // Simply move the number of rows per file if reading all at once else { - // Move is okay here we are reading in one go. + // Move is okay here as we are reading in one go. out_metadata.num_rows_per_source = std::move(_file_itm_data.num_rows_per_source); } } diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 5a3839142a2..5e3cc4301f9 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -348,6 +348,16 @@ class reader::impl { return _file_itm_data._output_chunk_count == 0; } + /** + * @brief Check if number of rows per source should be included in output metadata. + * + * @return True if AST filter is not present + */ + [[nodiscard]] bool include_output_num_rows_per_source() const + { + return not _expr_conv.get_converted_expr().has_value(); + } + /** * @brief Calculate the number of rows read from each source in the output chunk * From ed4352a09899c50b9f53aac368626eff09dd75a7 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Fri, 19 Jul 2024 18:10:08 +0000 Subject: [PATCH 22/22] Applying suggestions from reviews --- cpp/src/io/parquet/reader_impl.cpp | 12 ++++++------ cpp/src/io/parquet/reader_impl_chunking.cu | 19 ++++++++++++------- cpp/src/io/parquet/reader_impl_helpers.cpp | 5 ++--- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8721ceddc48..68ec61ead0a 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -637,12 +637,12 @@ std::vector reader::impl::calculate_output_num_rows_per_source(size_t co auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source; // Binary search start_row and end_row in exclusive_sum_num_rows_per_source vector - auto const start_iter = thrust::upper_bound( - partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row); + auto const start_iter = + std::upper_bound(partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row); auto const end_iter = (end_row == _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows) ? partial_sum_nrows_source.cend() - 1 - : thrust::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row); + : std::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row); // Compute the array offset index for both iterators auto const start_idx = std::distance(partial_sum_nrows_source.cbegin(), start_iter); @@ -660,9 +660,9 @@ std::vector reader::impl::calculate_output_num_rows_per_source(size_t co // Compute the number of rows from the last source file num_rows_per_source[end_idx] = end_row - partial_sum_nrows_source[end_idx - 1]; // Simply copy the number of rows for each source in range: (start_idx, end_idx) - thrust::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1, - _file_itm_data.num_rows_per_source.cbegin() + end_idx, - num_rows_per_source.begin() + start_idx + 1); + std::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1, + _file_itm_data.num_rows_per_source.cbegin() + end_idx, + num_rows_per_source.begin() + start_idx + 1); } return num_rows_per_source; diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 30f2a038264..05e0d8c0111 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1232,17 +1232,22 @@ void reader::impl::setup_next_pass(read_mode mode) pass.skip_rows = _file_itm_data.global_skip_rows; pass.num_rows = _file_itm_data.global_num_rows; } else { - auto const start_row = + // pass_start_row and pass_end_row are computed from the selected row groups relative to the + // global_skip_rows. + auto const pass_start_row = _file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; - auto const end_row = + auto const pass_end_row = std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1], _file_itm_data.global_num_rows); - // skip_rows is always global in the sense that it is relative to the first row of - // everything we will be reading, regardless of what pass we are on. - // num_rows is how many rows we are reading this pass. - pass.skip_rows = _file_itm_data.global_skip_rows + start_row; - pass.num_rows = end_row - start_row; + // pass.skip_rows is always global in the sense that it is relative to the first row of + // the data source (global row number 0), regardless of what pass we are on. Therefore, + // we must re-add global_skip_rows to the pass_start_row which is relative to the + // global_skip_rows. + pass.skip_rows = _file_itm_data.global_skip_rows + pass_start_row; + // num_rows is how many rows we are reading this pass. Since this is a difference, adding + // global_skip_rows to both variables is redundant. + pass.num_rows = pass_end_row - pass_start_row; } // load page information for the chunk. this retrieves the compressed bytes for all the diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 61405708f01..581c44d024b 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1010,9 +1010,8 @@ aggregate_reader_metadata::select_row_groups( if (count > rows_to_skip || count == 0) { // start row of this row group adjusted with rows_to_skip num_rows_per_source[src_idx] += count; - num_rows_per_source[src_idx] -= (chunk_start_row <= rows_to_skip and count > rows_to_skip) - ? rows_to_skip - : chunk_start_row; + num_rows_per_source[src_idx] -= + (chunk_start_row <= rows_to_skip) ? rows_to_skip : chunk_start_row; // We need the unadjusted start index of this row group to correctly initialize // ColumnChunkDesc for this row group in create_global_chunk_info() and calculate