diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index fcfea35f50c..8b86412ae63 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -130,10 +130,21 @@ class reader::impl { bool uses_custom_row_bounds, host_span const> row_group_indices); + /** + * @brief Create chunk information and start file reads + * + * @param row_groups_info vector of information about row groups to read + * @param num_rows Maximum number of rows to read + * @return pair of boolean indicating if compressed chunks were found and a vector of futures for + * read completion + */ + std::pair>> create_and_read_column_chunks( + cudf::host_span const row_groups_info, size_type num_rows); + /** * @brief Load and decompress the input file(s) into memory. */ - void load_and_decompress_data(std::vector const& row_groups_info, + void load_and_decompress_data(cudf::host_span const row_groups_info, size_type num_rows); /** diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index b1d013a96a3..0f55cd6e400 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -651,16 +651,11 @@ void reader::impl::allocate_nesting_info() page_nesting_decode_info.host_to_device(_stream); } -void reader::impl::load_and_decompress_data(std::vector const& row_groups_info, - size_type num_rows) +std::pair>> reader::impl::create_and_read_column_chunks( + cudf::host_span const row_groups_info, size_type num_rows) { - // This function should never be called if `num_rows == 0`. - CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero."); - - auto& raw_page_data = _file_itm_data.raw_page_data; - auto& decomp_page_data = _file_itm_data.decomp_page_data; - auto& chunks = _file_itm_data.chunks; - auto& pages_info = _file_itm_data.pages_info; + auto& raw_page_data = _file_itm_data.raw_page_data; + auto& chunks = _file_itm_data.chunks; // Descriptors for all the chunks that make up the selected columns const auto num_input_columns = _input_columns.size(); @@ -732,7 +727,7 @@ void reader::impl::load_and_decompress_data(std::vector const& r total_decompressed_size += col_meta.total_uncompressed_size; } } - remaining_rows -= row_group.num_rows; + remaining_rows -= row_group_rows; } // Read compressed chunk data to device memory @@ -745,12 +740,29 @@ void reader::impl::load_and_decompress_data(std::vector const& r chunk_source_map, _stream)); + CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read."); + + return {total_decompressed_size > 0, std::move(read_rowgroup_tasks)}; +} + +void reader::impl::load_and_decompress_data( + cudf::host_span const row_groups_info, size_type num_rows) +{ + // This function should never be called if `num_rows == 0`. + CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero."); + + auto& raw_page_data = _file_itm_data.raw_page_data; + auto& decomp_page_data = _file_itm_data.decomp_page_data; + auto& chunks = _file_itm_data.chunks; + auto& pages_info = _file_itm_data.pages_info; + + auto const [has_compressed_data, read_rowgroup_tasks] = + create_and_read_column_chunks(row_groups_info, num_rows); + for (auto& task : read_rowgroup_tasks) { task.wait(); } - CUDF_EXPECTS(remaining_rows <= 0, "All rows data must be read."); - // Process dataset chunk pages into output columns auto const total_pages = count_page_headers(chunks, _stream); pages_info = hostdevice_vector(total_pages, total_pages, _stream); @@ -758,14 +770,11 @@ void reader::impl::load_and_decompress_data(std::vector const& r if (total_pages > 0) { // decoding of column/page information decode_page_headers(chunks, pages_info, _stream); - if (total_decompressed_size > 0) { + if (has_compressed_data) { decomp_page_data = decompress_page_data(chunks, pages_info, _stream); // Free compressed data for (size_t c = 0; c < chunks.size(); c++) { - if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { - raw_page_data[c].reset(); - // TODO: Check if this is called - } + if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); } } }