From b22ddad1a596785e7f7b01832a871370100cd7cb Mon Sep 17 00:00:00 2001 From: db Date: Mon, 11 Sep 2023 14:36:52 -0500 Subject: [PATCH 1/5] Add support for progressive chunked reading. Specifically, limit the amount of memory used for decompression and other scratch space when decoding, which causes the reader to make multiple 'passes' over the set of row groups to be read. Signed-off-by: db --- cpp/include/cudf/io/detail/parquet.hpp | 11 +- cpp/include/cudf/io/parquet.hpp | 24 ++ cpp/src/io/functions.cpp | 17 + cpp/src/io/parquet/parquet_gpu.hpp | 69 +++- cpp/src/io/parquet/reader.cpp | 4 +- cpp/src/io/parquet/reader_impl.cpp | 128 ++++--- cpp/src/io/parquet/reader_impl.hpp | 50 ++- cpp/src/io/parquet/reader_impl_helpers.cpp | 4 +- cpp/src/io/parquet/reader_impl_helpers.hpp | 15 +- cpp/src/io/parquet/reader_impl_preprocess.cu | 351 +++++++++++++------ cpp/src/io/utilities/hostdevice_vector.hpp | 8 + 11 files changed, 483 insertions(+), 198 deletions(-) diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 3f2e1fa5e6c..152e28b9a14 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -91,7 +91,8 @@ class reader { class chunked_reader : private reader { public: /** - * @brief Constructor from a read size limit and an array of data sources with reader options. + * @brief Constructor from an output size memory limit and an input size memory limit and an array + * of data sources with reader options. * * The typical usage should be similar to this: * ``` @@ -102,17 +103,21 @@ class chunked_reader : private reader { * * ``` * - * If `chunk_read_limit == 0` (i.e., no reading limit), a call to `read_chunk()` will read the - * whole file and return a table containing all rows. + * If `chunk_read_limit == 0` (i.e., no output limit), and `pass_read_limit == 0` (no input + * temporary memory size limit) a call to `read_chunk()` will read the whole file and return a + * table containing all rows. * * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit + * @param pass_read_limit Limit on total amount of memory used for temporary computations during + * loading, or `0` if there is no limit. * @param sources Input `datasource` objects to read the dataset from * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource to use for device memory allocation */ explicit chunked_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 788ff15f3c1..dd6304a43e2 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -445,6 +445,30 @@ class chunked_parquet_reader { parquet_reader_options const& options, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** + * @brief Constructor for chunked reader. + * + * This constructor requires the same `parquet_reader_option` parameter as in + * `cudf::read_parquet()`, with additional parameters to specify the size byte limit of the + * output table for each reading, and a byte limit on the amount of temporary memory to use + * when reading. pass_read_limit affects how many row groups we can read at a time by limiting + * the amount of memory dedicated to decompression space. pass_read_limit is a hint, not an + * absolute limit - if a single row group cannot fit within the limit given, it will still be + * loaded. + * + * @param chunk_read_limit Limit on total number of bytes to be returned per read, + * or `0` if there is no limit + * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or + * `0` if there is no limit. + * @param options The options used to read Parquet file + * @param mr Device memory resource to use for device memory allocation + */ + chunked_parquet_reader( + std::size_t chunk_read_limit, + std::size_t pass_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + /** * @brief Destructor, destroying the internal reader instance. * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 5adb2046dbd..63e0b5eb940 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -542,6 +542,23 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, parquet_reader_options const& options, rmm::mr::device_memory_resource* mr) : reader{std::make_unique(chunk_read_limit, + 0, + make_datasources(options.get_source()), + options, + cudf::get_default_stream(), + mr)} +{ +} + +/** + * @copydoc cudf::io::chunked_parquet_reader::chunked_parquet_reader + */ +chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, + parquet_reader_options const& options, + rmm::mr::device_memory_resource* mr) + : reader{std::make_unique(chunk_read_limit, + pass_read_limit, make_datasources(options.get_source()), options, cudf::get_default_stream(), diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index e82b6abc13d..4176949064b 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -306,33 +306,74 @@ struct ColumnChunkDesc { }; /** - * @brief Struct to store raw/intermediate file data before parsing. + * @brief The row_group_info class + */ +struct row_group_info { + size_type index; + size_t start_row; // TODO source index + size_type source_index; + + row_group_info() = default; + + row_group_info(size_type index, size_t start_row, size_type source_index) + : index(index), start_row(start_row), source_index(source_index) + { + } +}; + +/** + * @brief Struct to store file-level data that remains constant for + * all passes/chunks for the file. */ struct file_intermediate_data { + // all row groups to read + std::vector row_groups{}; + + // all chunks from the selected row groups. We may end up reading these chunks progressively + // instead of all at once + std::vector chunks{}; + + // skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we + // may not be visiting every row group that contains these bounds + size_t global_skip_rows; + size_t global_num_rows; +}; + +/** + * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. + */ +struct chunk_read_info { + size_t skip_rows; + size_t num_rows; +}; + +/** + * @brief Struct to store pass-level data that remains constant for + * a single pass. + */ +struct pass_intermediate_data { std::vector> raw_page_data; rmm::device_buffer decomp_page_data; + + // rowgroup, chunk and page information for the current pass. + std::vector row_groups{}; cudf::detail::hostdevice_vector chunks{}; cudf::detail::hostdevice_vector pages_info{}; cudf::detail::hostdevice_vector page_nesting_info{}; cudf::detail::hostdevice_vector page_nesting_decode_info{}; - rmm::device_buffer level_decode_data; - int level_type_size; -}; - -/** - * @brief Struct to store intermediate page data for parsing each chunk of rows in chunked reading. - */ -struct chunk_intermediate_data { rmm::device_uvector page_keys{0, rmm::cuda_stream_default}; rmm::device_uvector page_index{0, rmm::cuda_stream_default}; rmm::device_uvector str_dict_index{0, rmm::cuda_stream_default}; -}; -/** - * @brief Structs to identify the reading row range for each chunk of rows in chunked reading. - */ -struct chunk_read_info { + std::vector output_chunk_read_info; + std::size_t current_output_chunk{0}; + + rmm::device_buffer level_decode_data; + int level_type_size; + + // skip_rows and num_rows values for this particular pass. these may be adjusted values from the + // global values stored in file_intermediate_data. size_t skip_rows; size_t num_rows; }; diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 7365c102d8f..1e87447006d 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -43,12 +43,14 @@ table_with_metadata reader::read(parquet_reader_options const& options) } chunked_reader::chunked_reader(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { - _impl = std::make_unique(chunk_read_limit, std::move(sources), options, stream, mr); + _impl = std::make_unique( + chunk_read_limit, pass_read_limit, std::move(sources), options, stream, mr); } chunked_reader::~chunked_reader() = default; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 9289ddb91b3..abb73ff6a83 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -45,10 +45,10 @@ auto& get_stream_pool() void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting = _file_itm_data.page_nesting_info; - auto& page_nesting_decode = _file_itm_data.page_nesting_decode_info; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto& page_nesting = _pass_itm_data->page_nesting_info; + auto& page_nesting_decode = _pass_itm_data->page_nesting_decode_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -71,7 +71,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) std::vector col_sizes(_input_columns.size(), 0L); if (has_strings) { gpu::ComputePageStringSizes( - pages, chunks, skip_rows, num_rows, _file_itm_data.level_type_size, _stream); + pages, chunks, skip_rows, num_rows, _pass_itm_data->level_type_size, _stream); col_sizes = calculate_page_string_offsets(); @@ -180,7 +180,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_data.host_to_device_async(_stream); _stream.synchronize(); - auto const level_type_size = _file_itm_data.level_type_size; + auto const level_type_size = _pass_itm_data->level_type_size; // vector of launched streams std::vector streams; @@ -283,6 +283,7 @@ reader::impl::impl(std::vector>&& sources, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : impl(0 /*chunk_read_limit*/, + 0 /*input_pass_read_limit*/, std::forward>>(sources), options, stream, @@ -291,11 +292,16 @@ reader::impl::impl(std::vector>&& sources, } reader::impl::impl(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : _stream{stream}, _mr{mr}, _sources{std::move(sources)}, _chunk_read_limit{chunk_read_limit} + : _stream{stream}, + _mr{mr}, + _sources{std::move(sources)}, + _output_chunk_read_limit{chunk_read_limit}, + _input_pass_read_limit{pass_read_limit} { // Open and parse the source dataset metadata _metadata = std::make_unique(_sources); @@ -319,11 +325,8 @@ reader::impl::impl(std::size_t chunk_read_limit, _timestamp_type.id()); // Save the states of the output buffers for reuse in `chunk_read()`. - // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { - for (auto const& buff : _output_buffers) { - _output_buffers_template.emplace_back(inline_column_buffer::empty_like(buff)); - } + for (auto const& buff : _output_buffers) { + _output_buffers_template.emplace_back(inline_column_buffer::empty_like(buff)); } } @@ -333,32 +336,62 @@ void reader::impl::prepare_data(int64_t skip_rows, host_span const> row_group_indices, std::optional> filter) { - if (_file_preprocessed) { return; } + // if we have not preprocessed at the whole-file level, do that now + if (!_file_preprocessed) { + // if filter is not empty, then create output types as vector and pass for filtering. + std::vector output_types; + if (filter.has_value()) { + std::transform(_output_buffers.cbegin(), + _output_buffers.cend(), + std::back_inserter(output_types), + [](auto const& col) { return col.type; }); + } + std::tie( + _file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) = + _metadata->select_row_groups( + row_group_indices, skip_rows, num_rows, output_types, filter, _stream); + + if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && + not _input_columns.empty()) { + // fills in chunk information without physically loading or decompressing + // the associated data + load_global_chunk_info(); + + // compute schedule of input reads. Each rowgroup contains 1 chunk per column. For now + // we will read an entire row group at a time. However, it is possible to do + // sub-rowgroup reads if we made some estimates on individual chunk sizes (tricky) and + // changed the high level structure such that we weren't always reading an entire table's + // worth of columns at once. + compute_input_pass_row_group_info(); + } - // if filter is not empty, then create output types as vector and pass for filtering. - std::vector output_types; - if (filter.has_value()) { - std::transform(_output_buffers.cbegin(), - _output_buffers.cend(), - std::back_inserter(output_types), - [](auto const& col) { return col.type; }); + _file_preprocessed = true; } - auto const [skip_rows_corrected, num_rows_corrected, row_groups_info] = - _metadata->select_row_groups( - row_group_indices, skip_rows, num_rows, output_types, filter, _stream); - - if (num_rows_corrected > 0 && not row_groups_info.empty() && not _input_columns.empty()) { - load_and_decompress_data(row_groups_info, num_rows_corrected); - preprocess_pages( - skip_rows_corrected, num_rows_corrected, uses_custom_row_bounds, _chunk_read_limit); - - if (_chunk_read_limit == 0) { // read the whole file at once - CUDF_EXPECTS(_chunk_read_info.size() == 1, - "Reading the whole file should yield only one chunk."); + + // if we have to start a new pass, do that now + if (!_pass_preprocessed) { + auto const num_passes = _input_pass_row_group_indices.size() - 1; + + // always create the pass struct, even if we end up with no passes. + // this will also cause the previous pass information to be deleted + _pass_itm_data = std::make_unique(); + + if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && + not _input_columns.empty() && _current_input_pass < num_passes) { + // setup the pass_intermediate_info for this pass. + setup_pass(); + + load_and_decompress_data(); + preprocess_pages(uses_custom_row_bounds, _output_chunk_read_limit); + + if (_output_chunk_read_limit == 0) { // read the whole file at once + CUDF_EXPECTS(_pass_itm_data->output_chunk_read_info.size() == 1, + "Reading the whole file should yield only one chunk."); + } } - } - _file_preprocessed = true; + _pass_preprocessed = true; + } } void reader::impl::populate_metadata(table_metadata& out_metadata) @@ -387,11 +420,12 @@ table_with_metadata reader::impl::read_chunk_internal( auto out_columns = std::vector>{}; out_columns.reserve(_output_buffers.size()); - if (!has_next() || _chunk_read_info.empty()) { + if (!has_next() || _pass_itm_data->output_chunk_read_info.empty()) { return finalize_output(out_metadata, out_columns, filter); } - auto const& read_info = _chunk_read_info[_current_read_chunk++]; + auto const& read_info = + _pass_itm_data->output_chunk_read_info[_pass_itm_data->current_output_chunk]; // Allocate memory buffers for the output columns. allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); @@ -444,6 +478,17 @@ table_with_metadata reader::impl::finalize_output( _output_metadata = std::make_unique(out_metadata); } + // advance chunks/passes as necessary + _pass_itm_data->current_output_chunk++; + _chunk_count++; + if (_pass_itm_data->current_output_chunk >= _pass_itm_data->output_chunk_read_info.size()) { + _pass_itm_data->current_output_chunk = 0; + _pass_itm_data->output_chunk_read_info.clear(); + + _current_input_pass++; + _pass_preprocessed = false; + } + if (filter.has_value()) { auto read_table = std::make_unique(std::move(out_columns)); auto predicate = cudf::detail::compute_column( @@ -463,7 +508,8 @@ table_with_metadata reader::impl::read( host_span const> row_group_indices, std::optional> filter) { - CUDF_EXPECTS(_chunk_read_limit == 0, "Reading the whole file must not have non-zero byte_limit."); + CUDF_EXPECTS(_output_chunk_read_limit == 0, + "Reading the whole file must not have non-zero byte_limit."); table_metadata metadata; populate_metadata(metadata); auto expr_conv = named_to_reference_converter(filter, metadata); @@ -477,7 +523,7 @@ table_with_metadata reader::impl::read_chunk() { // Reset the output buffers to their original states (right after reader construction). // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { + if (_chunk_count > 0) { _output_buffers.resize(0); for (auto const& buff : _output_buffers_template) { _output_buffers.emplace_back(inline_column_buffer::empty_like(buff)); @@ -499,7 +545,11 @@ bool reader::impl::has_next() true /*uses_custom_row_bounds*/, {} /*row_group_indices, empty means read all row groups*/, std::nullopt /*filter*/); - return _current_read_chunk < _chunk_read_info.size(); + + auto const num_input_passes = + _input_pass_row_group_indices.size() == 0 ? 0 : _input_pass_row_group_indices.size() - 1; + return (_pass_itm_data->current_output_chunk < _pass_itm_data->output_chunk_read_info.size()) || + (_current_input_pass < num_input_passes); } namespace { diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index a980670e465..0337b64a4ac 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -90,8 +90,8 @@ class reader::impl { * ``` * * Reading the whole given file at once through `read()` function is still supported if - * `chunk_read_limit == 0` (i.e., no reading limit). - * In such case, `read_chunk()` will also return rows of the entire file. + * `chunk_read_limit == 0` (i.e., no reading limit) and `pass_read_limit == 0` (no temporary + * memory limit) In such case, `read_chunk()` will also return rows of the entire file. * * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit @@ -101,6 +101,7 @@ class reader::impl { * @param mr Device memory resource to use for device memory allocation */ explicit impl(std::size_t chunk_read_limit, + std::size_t pass_read_limit, std::vector>&& sources, parquet_reader_options const& options, rmm::cuda_stream_view stream, @@ -133,22 +134,22 @@ class reader::impl { host_span const> row_group_indices, std::optional> filter); + void load_global_chunk_info(); + void compute_input_pass_row_group_info(); + void setup_pass(); + /** * @brief Create chunk information and start file reads * - * @param row_groups_info vector of information about row groups to read - * @param num_rows Maximum number of rows to read * @return pair of boolean indicating if compressed chunks were found and a vector of futures for * read completion */ - std::pair>> create_and_read_column_chunks( - cudf::host_span const row_groups_info, size_type num_rows); + std::pair>> read_and_decompress_column_chunks(); /** * @brief Load and decompress the input file(s) into memory. */ - void load_and_decompress_data(cudf::host_span const row_groups_info, - size_type num_rows); + void load_and_decompress_data(); /** * @brief Perform some preprocessing for page data and also compute the split locations @@ -161,17 +162,12 @@ class reader::impl { * * For flat schemas, these values are computed during header decoding (see gpuDecodePageHeaders). * - * @param skip_rows Crop all rows below skip_rows - * @param num_rows Maximum number of rows to read * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific * bounds * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit */ - void preprocess_pages(size_t skip_rows, - size_t num_rows, - bool uses_custom_row_bounds, - size_t chunk_read_limit); + void preprocess_pages(bool uses_custom_row_bounds, size_t chunk_read_limit); /** * @brief Allocate nesting information storage for all pages and set pointers to it. @@ -278,12 +274,28 @@ class reader::impl { std::optional> _reader_column_schema; data_type _timestamp_type{type_id::EMPTY}; - // Variables used for chunked reading: + // chunked reading happens in 2 parts: + // + // At the top level there is the "pass" in which we try and limit the + // total amount of temporary memory (compressed data, decompressed data) in use + // via _input_pass_read_limit. + // + // Within a pass, we produce one or more chunks of output, whose maximum total + // byte size is controlled by _output_chunk_read_limit. + cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; - cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data; - std::vector _chunk_read_info; - std::size_t _chunk_read_limit{0}; - std::size_t _current_read_chunk{0}; + std::unique_ptr _pass_itm_data; + + // an array of indices into _file_itm_data::global_chunks. Each pair of indices represents + // the start/end of the chunks to be loaded for a given pass. + std::vector _input_pass_row_group_indices{}; + std::vector _input_pass_row_count{}; + std::size_t _current_input_pass{0}; + std::size_t _chunk_count{0}; + + std::size_t _output_chunk_read_limit{0}; + std::size_t _input_pass_read_limit{0}; + bool _pass_preprocessed{false}; bool _file_preprocessed{false}; }; diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index f6dbeb275fc..fcaa610fbb7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -344,7 +344,7 @@ std::vector aggregate_reader_metadata::get_pandas_index_names() con return names; } -std::tuple> +std::tuple> aggregate_reader_metadata::select_row_groups( host_span const> row_group_indices, int64_t skip_rows_opt, @@ -362,7 +362,7 @@ aggregate_reader_metadata::select_row_groups( host_span const>(filtered_row_group_indices.value()); } } - std::vector selection; + std::vector selection; auto [rows_to_skip, rows_to_read] = [&]() { if (not row_group_indices.empty()) { return std::pair{}; } auto const from_opts = cudf::io::detail::skip_rows_num_rows_from_options( diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 751ffc33123..61e4f94df0f 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -53,19 +53,6 @@ using namespace cudf::io::parquet; : data_type{t_id}; } -/** - * @brief The row_group_info class - */ -struct row_group_info { - size_type const index; - size_t const start_row; // TODO source index - size_type const source_index; - row_group_info(size_type index, size_t start_row, size_type source_index) - : index(index), start_row(start_row), source_index(source_index) - { - } -}; - /** * @brief Class for parsing dataset metadata */ @@ -194,7 +181,7 @@ class aggregate_reader_metadata { * @return A tuple of corrected row_start, row_count and list of row group indexes and its * starting row */ - [[nodiscard]] std::tuple> select_row_groups( + [[nodiscard]] std::tuple> select_row_groups( host_span const> row_group_indices, int64_t row_start, std::optional const& row_count, diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index bde73c3dd96..cc31c4fe49d 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -577,10 +577,10 @@ int decode_page_headers(cudf::detail::hostdevice_vector& c void reader::impl::allocate_nesting_info() { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto& page_nesting_info = _file_itm_data.page_nesting_info; - auto& page_nesting_decode_info = _file_itm_data.page_nesting_decode_info; + auto const& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto& page_nesting_info = _pass_itm_data->page_nesting_info; + auto& page_nesting_decode_info = _pass_itm_data->page_nesting_decode_info; // compute total # of page_nesting infos needed and allocate space. doing this in one // buffer to keep it to a single gpu allocation @@ -702,38 +702,39 @@ void reader::impl::allocate_nesting_info() void reader::impl::allocate_level_decode_space() { - auto& pages = _file_itm_data.pages_info; + auto& pages = _pass_itm_data->pages_info; // TODO: this could be made smaller if we ignored dictionary pages and pages with no // repetition data. size_t const per_page_decode_buf_size = - LEVEL_DECODE_BUF_SIZE * 2 * _file_itm_data.level_type_size; + LEVEL_DECODE_BUF_SIZE * 2 * _pass_itm_data->level_type_size; auto const decode_buf_size = per_page_decode_buf_size * pages.size(); - _file_itm_data.level_decode_data = + _pass_itm_data->level_decode_data = rmm::device_buffer(decode_buf_size, _stream, rmm::mr::get_current_device_resource()); // distribute the buffers - uint8_t* buf = static_cast(_file_itm_data.level_decode_data.data()); + uint8_t* buf = static_cast(_pass_itm_data->level_decode_data.data()); for (size_t idx = 0; idx < pages.size(); idx++) { auto& p = pages[idx]; p.lvl_decode_buf[gpu::level_type::DEFINITION] = buf; - buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size); + buf += (LEVEL_DECODE_BUF_SIZE * _pass_itm_data->level_type_size); p.lvl_decode_buf[gpu::level_type::REPETITION] = buf; - buf += (LEVEL_DECODE_BUF_SIZE * _file_itm_data.level_type_size); + buf += (LEVEL_DECODE_BUF_SIZE * _pass_itm_data->level_type_size); } } -std::pair>> reader::impl::create_and_read_column_chunks( - cudf::host_span const row_groups_info, size_type num_rows) +std::pair>> reader::impl::read_and_decompress_column_chunks() { - auto& raw_page_data = _file_itm_data.raw_page_data; - auto& chunks = _file_itm_data.chunks; + auto row_groups_info = _pass_itm_data->row_groups; + auto const num_rows = _pass_itm_data->num_rows; + + auto& raw_page_data = _pass_itm_data->raw_page_data; + auto& chunks = _pass_itm_data->chunks; // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = _input_columns.size(); auto const num_chunks = row_groups_info.size() * num_input_columns; - chunks = cudf::detail::hostdevice_vector(0, num_chunks, _stream); // Association between each column chunk and its source std::vector chunk_source_map(num_chunks); @@ -747,13 +748,68 @@ std::pair>> reader::impl::create_and_read_co // Initialize column chunk information size_t total_decompressed_size = 0; auto remaining_rows = num_rows; - std::vector> read_rowgroup_tasks; + std::vector> read_chunk_tasks; + size_type chunk_count = 0; for (auto const& rg : row_groups_info) { auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); - auto const row_group_start = rg.start_row; auto const row_group_source = rg.source_index; auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); + // generate ColumnChunkDesc objects for everything to be decoded (all input columns) + for (size_t i = 0; i < num_input_columns; ++i) { + auto col = _input_columns[i]; + // look up metadata + auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); + + column_chunk_offsets[chunk_count] = + (col_meta.dictionary_page_offset != 0) + ? std::min(col_meta.data_page_offset, col_meta.dictionary_page_offset) + : col_meta.data_page_offset; + + // Map each column chunk to its column index and its source index + chunk_source_map[chunk_count] = row_group_source; + + if (col_meta.codec != Compression::UNCOMPRESSED) { + total_decompressed_size += col_meta.total_uncompressed_size; + } + + chunk_count++; + } + remaining_rows -= row_group_rows; + } + + // Read compressed chunk data to device memory + read_chunk_tasks.push_back(read_column_chunks_async(_sources, + raw_page_data, + chunks, + 0, + chunks.size(), + column_chunk_offsets, + chunk_source_map, + _stream)); + + CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read."); + + return {total_decompressed_size > 0, std::move(read_chunk_tasks)}; +} + +void reader::impl::load_global_chunk_info() +{ + auto const num_rows = _file_itm_data.global_num_rows; + auto row_groups_info = _file_itm_data.row_groups; + auto& chunks = _file_itm_data.chunks; + + // Descriptors for all the chunks that make up the selected columns + auto const num_input_columns = _input_columns.size(); + auto const num_chunks = row_groups_info.size() * num_input_columns; + + // Initialize column chunk information + auto remaining_rows = num_rows; + for (auto const& rg : row_groups_info) { + auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); + auto const row_group_start = rg.start_row; + auto const row_group_rows = std::min(remaining_rows, row_group.num_rows); + // generate ColumnChunkDesc objects for everything to be decoded (all input columns) for (size_t i = 0; i < num_input_columns; ++i) { auto col = _input_columns[i]; @@ -768,11 +824,6 @@ std::pair>> reader::impl::create_and_read_co schema.converted_type, schema.type_length); - column_chunk_offsets[chunks.size()] = - (col_meta.dictionary_page_offset != 0) - ? std::min(col_meta.data_page_offset, col_meta.dictionary_page_offset) - : col_meta.data_page_offset; - chunks.push_back(gpu::ColumnChunkDesc(col_meta.total_compressed_size, nullptr, col_meta.num_values, @@ -792,92 +843,180 @@ std::pair>> reader::impl::create_and_read_co clock_rate, i, col.schema_idx)); + } - // Map each column chunk to its column index and its source index - chunk_source_map[chunks.size() - 1] = row_group_source; + remaining_rows -= row_group_rows; + } +} - if (col_meta.codec != Compression::UNCOMPRESSED) { - total_decompressed_size += col_meta.total_uncompressed_size; +void reader::impl::compute_input_pass_row_group_info() +{ + // at this point, row_groups has already been filtered down to just the row groups we need to + // handle optional skip_rows/num_rows parameters. + auto row_groups_info = _file_itm_data.row_groups; + + // if the user hasn't specified an input size limit, read everything in a single pass. + if (_input_pass_read_limit == 0) { + _input_pass_row_group_indices.push_back(0); + _input_pass_row_group_indices.push_back(row_groups_info.size()); + return; + } + + // generate passes. make sure to account for the case where a single row group doesn't fit within + // + std::size_t const read_limit = + _input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits::max(); + std::size_t cur_read = 0; + std::size_t cur_rg_start = 0; + std::size_t cur_rg_index = 0; + std::size_t cur_row_count = 0; + _input_pass_row_group_indices.push_back(0); + _input_pass_row_count.push_back(0); + for (auto const& rg : row_groups_info) { + auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); + + // if we're past the read limit, add another pass + if (cur_read + row_group.total_byte_size >= read_limit) { + // always need to add at least 1 row group + if (cur_rg_start == cur_rg_index) { + _input_pass_row_group_indices.push_back(cur_rg_index + 1); + _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); + cur_rg_start = cur_rg_index + 1; + } else { + _input_pass_row_group_indices.push_back(cur_rg_index); + _input_pass_row_count.push_back(cur_row_count); + cur_rg_start = cur_rg_index; } + cur_read = 0; + cur_rg_index = cur_rg_start; + } else { + cur_read += row_group.total_byte_size; + cur_rg_index++; } - remaining_rows -= row_group_rows; - } - // Read compressed chunk data to device memory - read_rowgroup_tasks.push_back(read_column_chunks_async(_sources, - raw_page_data, - chunks, - 0, - chunks.size(), - column_chunk_offsets, - chunk_source_map, - _stream)); + cur_row_count += row_group.num_rows; + } + // add the last pass if necessary + if (_input_pass_row_group_indices.back() != row_groups_info.size()) { + _input_pass_row_group_indices.push_back(row_groups_info.size()); + _input_pass_row_count.push_back(cur_row_count); + } - CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read."); + for (size_t idx = 0; idx < _input_pass_row_group_indices.size(); idx++) { + printf("Pass(%lu): rgi(%lu), row_count(%lu)\n", + idx, + _input_pass_row_group_indices[idx], + _input_pass_row_count[idx]); + } +} - return {total_decompressed_size > 0, std::move(read_rowgroup_tasks)}; +void reader::impl::setup_pass() +{ + // this will also cause the previous pass information to be deleted + _pass_itm_data = std::make_unique(); + + // setup row groups to be loaded for this pass + auto const row_group_start = _input_pass_row_group_indices[_current_input_pass]; + auto const row_group_end = _input_pass_row_group_indices[_current_input_pass + 1]; + auto const num_row_groups = row_group_end - row_group_start; + _pass_itm_data->row_groups.resize(num_row_groups); + std::copy(_file_itm_data.row_groups.begin() + row_group_start, + _file_itm_data.row_groups.begin() + row_group_end, + _pass_itm_data->row_groups.begin()); + + auto const num_passes = _input_pass_row_group_indices.size() - 1; + CUDF_EXPECTS(_current_input_pass < num_passes, "Encountered an invalid read pass index"); + + auto const chunks_per_rowgroup = _input_columns.size(); + auto const num_chunks = chunks_per_rowgroup * num_row_groups; + + auto chunk_start = _file_itm_data.chunks.begin() + (row_group_start * chunks_per_rowgroup); + auto chunk_end = _file_itm_data.chunks.begin() + (row_group_end * chunks_per_rowgroup); + + _pass_itm_data->chunks = + cudf::detail::hostdevice_vector(0, num_chunks, _stream); + _pass_itm_data->chunks.resize(num_chunks, _stream); + std::copy(chunk_start, chunk_end, _pass_itm_data->chunks.begin()); + + // adjust skip_rows and num_rows by what's available in the row groups we are processing + if (num_passes == 1) { + _pass_itm_data->skip_rows = _file_itm_data.global_skip_rows; + _pass_itm_data->num_rows = _file_itm_data.global_num_rows; + } else { + auto const global_start_row = _file_itm_data.global_skip_rows; + auto const global_end_row = global_start_row + _file_itm_data.global_num_rows; + auto const start_row = std::max(_input_pass_row_count[row_group_start], global_start_row); + auto const end_row = std::min(_input_pass_row_count[row_group_end], global_end_row); + + // skip_rows is always global in the sense that it is relative to the first row of + // everything we will be reading, regardless of what pass we are on. + // num_rows is how many rows we are reading this pass. + // _pass_itm_data->skip_rows = start_row - _input_pass_row_count[row_group_start]; + _pass_itm_data->skip_rows = global_start_row + _input_pass_row_count[row_group_start]; + _pass_itm_data->num_rows = end_row - start_row; + + int whee = 10; + whee++; + } } -void reader::impl::load_and_decompress_data( - cudf::host_span const row_groups_info, size_type num_rows) +void reader::impl::load_and_decompress_data() { // This function should never be called if `num_rows == 0`. - CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero."); + CUDF_EXPECTS(_pass_itm_data->num_rows > 0, "Number of reading rows must not be zero."); - auto& raw_page_data = _file_itm_data.raw_page_data; - auto& decomp_page_data = _file_itm_data.decomp_page_data; - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto& raw_page_data = _pass_itm_data->raw_page_data; + auto& decomp_page_data = _pass_itm_data->decomp_page_data; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; - auto const [has_compressed_data, read_rowgroup_tasks] = - create_and_read_column_chunks(row_groups_info, num_rows); + auto const [has_compressed_data, read_chunks_tasks] = read_and_decompress_column_chunks(); - for (auto& task : read_rowgroup_tasks) { + for (auto& task : read_chunks_tasks) { task.wait(); } // Process dataset chunk pages into output columns auto const total_pages = count_page_headers(chunks, _stream); + if (total_pages <= 0) { return; } pages = cudf::detail::hostdevice_vector(total_pages, total_pages, _stream); - if (total_pages > 0) { - // decoding of column/page information - _file_itm_data.level_type_size = decode_page_headers(chunks, pages, _stream); - if (has_compressed_data) { - decomp_page_data = decompress_page_data(chunks, pages, _stream); - // Free compressed data - for (size_t c = 0; c < chunks.size(); c++) { - if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); } - } + // decoding of column/page information + _pass_itm_data->level_type_size = decode_page_headers(chunks, pages, _stream); + if (has_compressed_data) { + decomp_page_data = decompress_page_data(chunks, pages, _stream); + // Free compressed data + for (size_t c = 0; c < chunks.size(); c++) { + if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); } } + } - // build output column info - // walk the schema, building out_buffers that mirror what our final cudf columns will look - // like. important : there is not necessarily a 1:1 mapping between input columns and output - // columns. For example, parquet does not explicitly store a ColumnChunkDesc for struct - // columns. The "structiness" is simply implied by the schema. For example, this schema: - // required group field_id=1 name { - // required binary field_id=2 firstname (String); - // required binary field_id=3 middlename (String); - // required binary field_id=4 lastname (String); - // } - // will only contain 3 columns of data (firstname, middlename, lastname). But of course - // "name" is a struct column that we want to return, so we have to make sure that we - // create it ourselves. - // std::vector output_info = build_output_column_info(); - - // the following two allocate functions modify the page data - pages.device_to_host_sync(_stream); - { - // nesting information (sizes, etc) stored -per page- - // note : even for flat schemas, we allocate 1 level of "nesting" info - allocate_nesting_info(); + // build output column info + // walk the schema, building out_buffers that mirror what our final cudf columns will look + // like. important : there is not necessarily a 1:1 mapping between input columns and output + // columns. For example, parquet does not explicitly store a ColumnChunkDesc for struct + // columns. The "structiness" is simply implied by the schema. For example, this schema: + // required group field_id=1 name { + // required binary field_id=2 firstname (String); + // required binary field_id=3 middlename (String); + // required binary field_id=4 lastname (String); + // } + // will only contain 3 columns of data (firstname, middlename, lastname). But of course + // "name" is a struct column that we want to return, so we have to make sure that we + // create it ourselves. + // std::vector output_info = build_output_column_info(); + + // the following two allocate functions modify the page data + pages.device_to_host_sync(_stream); + { + // nesting information (sizes, etc) stored -per page- + // note : even for flat schemas, we allocate 1 level of "nesting" info + allocate_nesting_info(); - // level decode space - allocate_level_decode_space(); - } - pages.host_to_device_async(_stream); + // level decode space + allocate_level_decode_space(); } + pages.host_to_device_async(_stream); } namespace { @@ -1183,7 +1322,7 @@ std::vector find_splits(std::vector c */ std::vector compute_splits( cudf::detail::hostdevice_vector& pages, - gpu::chunk_intermediate_data const& id, + gpu::pass_intermediate_data const& id, size_t num_rows, size_t chunk_read_limit, rmm::cuda_stream_view stream) @@ -1539,13 +1678,12 @@ struct page_offset_output_iter { } // anonymous namespace -void reader::impl::preprocess_pages(size_t skip_rows, - size_t num_rows, - bool uses_custom_row_bounds, - size_t chunk_read_limit) +void reader::impl::preprocess_pages(bool uses_custom_row_bounds, size_t chunk_read_limit) { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto const skip_rows = _pass_itm_data->skip_rows; + auto const num_rows = _pass_itm_data->num_rows; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; // compute page ordering. // @@ -1636,7 +1774,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, // Build index for string dictionaries since they can't be indexed // directly due to variable-sized elements - _chunk_itm_data.str_dict_index = + _pass_itm_data->str_dict_index = cudf::detail::make_zeroed_device_uvector_async( total_str_dict_indexes, _stream, rmm::mr::get_current_device_resource()); @@ -1646,7 +1784,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, CUDF_EXPECTS(input_col.schema_idx == chunks[c].src_col_schema, "Column/page schema index mismatch"); if (is_dict_chunk(chunks[c])) { - chunks[c].str_dict_index = _chunk_itm_data.str_dict_index.data() + str_ofs; + chunks[c].str_dict_index = _pass_itm_data->str_dict_index.data() + str_ofs; str_ofs += pages[page_count].num_input_values; } @@ -1677,7 +1815,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, std::numeric_limits::max(), true, // compute num_rows chunk_read_limit > 0, // compute string sizes - _file_itm_data.level_type_size, + _pass_itm_data->level_type_size, _stream); // computes: @@ -1699,20 +1837,21 @@ void reader::impl::preprocess_pages(size_t skip_rows, } // preserve page ordering data for string decoder - _chunk_itm_data.page_keys = std::move(page_keys); - _chunk_itm_data.page_index = std::move(page_index); + _pass_itm_data->page_keys = std::move(page_keys); + _pass_itm_data->page_index = std::move(page_index); // compute splits if necessary. otherwise return a single split representing // the whole file. - _chunk_read_info = chunk_read_limit > 0 - ? compute_splits(pages, _chunk_itm_data, num_rows, chunk_read_limit, _stream) - : std::vector{{skip_rows, num_rows}}; + _pass_itm_data->output_chunk_read_info = + _output_chunk_read_limit > 0 + ? compute_splits(pages, *_pass_itm_data, num_rows, chunk_read_limit, _stream) + : std::vector{{skip_rows, num_rows}}; } void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds) { - auto const& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; + auto const& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to parse"); @@ -1729,7 +1868,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses num_rows, false, // num_rows is already computed false, // no need to compute string sizes - _file_itm_data.level_type_size, + _pass_itm_data->level_type_size, _stream); // print_pages(pages, _stream); @@ -1766,7 +1905,7 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // compute output column sizes by examining the pages of the -input- columns if (has_lists) { - auto& page_index = _chunk_itm_data.page_index; + auto& page_index = _pass_itm_data->page_index; std::vector h_cols_info; h_cols_info.reserve(_input_columns.size()); @@ -1846,10 +1985,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses std::vector reader::impl::calculate_page_string_offsets() { - auto& chunks = _file_itm_data.chunks; - auto& pages = _file_itm_data.pages_info; - auto const& page_keys = _chunk_itm_data.page_keys; - auto const& page_index = _chunk_itm_data.page_index; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto const& page_keys = _pass_itm_data->page_keys; + auto const& page_index = _pass_itm_data->page_index; std::vector col_sizes(_input_columns.size(), 0L); rmm::device_uvector d_col_sizes(col_sizes.size(), _stream); diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index a6a93c41472..38f18daa80e 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -96,6 +96,14 @@ class hostdevice_vector { [[nodiscard]] size_t size_bytes() const noexcept { return sizeof(T) * size(); } [[nodiscard]] bool empty() const noexcept { return size() == 0; } + void resize(size_t new_size, rmm::cuda_stream_view stream) + { + CUDF_EXPECTS(new_size <= capacity(), + "hostdevice_vector resize must fit within existing capacity"); + stream.synchronize(); + current_size = new_size; + } + [[nodiscard]] T& operator[](size_t i) { return host_data[i]; } [[nodiscard]] T const& operator[](size_t i) const { return host_data[i]; } From 8bca138a510fe39cb6d6ec2a205e69416131e4e6 Mon Sep 17 00:00:00 2001 From: db Date: Wed, 20 Sep 2023 16:00:48 -0500 Subject: [PATCH 2/5] Bug fixes + tests. --- cpp/src/io/parquet/reader_impl_preprocess.cu | 39 +++++------- cpp/tests/io/parquet_chunked_reader_test.cpp | 65 +++++++++++++++++++- 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index fdfff168ca6..44d1227b67a 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -868,32 +868,34 @@ void reader::impl::compute_input_pass_row_group_info() _input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits::max(); std::size_t cur_read = 0; std::size_t cur_rg_start = 0; - std::size_t cur_rg_index = 0; std::size_t cur_row_count = 0; _input_pass_row_group_indices.push_back(0); _input_pass_row_count.push_back(0); - for (auto const& rg : row_groups_info) { - auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); - // if we're past the read limit, add another pass + // for (auto const& rg : row_groups_info) { + for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { + auto const& rgi = row_groups_info[cur_rg_index]; + auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); + + // can we add this guy if (cur_read + row_group.total_byte_size >= read_limit) { - // always need to add at least 1 row group + // always need to add at least 1 row group, so add ourselves if (cur_rg_start == cur_rg_index) { _input_pass_row_group_indices.push_back(cur_rg_index + 1); _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); cur_rg_start = cur_rg_index + 1; - } else { + cur_read = 0; + } + // add the previous group + else { _input_pass_row_group_indices.push_back(cur_rg_index); _input_pass_row_count.push_back(cur_row_count); cur_rg_start = cur_rg_index; + cur_read = row_group.total_byte_size; } - cur_read = 0; - cur_rg_index = cur_rg_start; } else { cur_read += row_group.total_byte_size; - cur_rg_index++; } - cur_row_count += row_group.num_rows; } // add the last pass if necessary @@ -901,13 +903,6 @@ void reader::impl::compute_input_pass_row_group_info() _input_pass_row_group_indices.push_back(row_groups_info.size()); _input_pass_row_count.push_back(cur_row_count); } - - for (size_t idx = 0; idx < _input_pass_row_group_indices.size(); idx++) { - printf("Pass(%lu): rgi(%lu), row_count(%lu)\n", - idx, - _input_pass_row_group_indices[idx], - _input_pass_row_count[idx]); - } } void reader::impl::setup_pass() @@ -945,18 +940,14 @@ void reader::impl::setup_pass() } else { auto const global_start_row = _file_itm_data.global_skip_rows; auto const global_end_row = global_start_row + _file_itm_data.global_num_rows; - auto const start_row = std::max(_input_pass_row_count[row_group_start], global_start_row); - auto const end_row = std::min(_input_pass_row_count[row_group_end], global_end_row); + auto const start_row = std::max(_input_pass_row_count[_current_input_pass], global_start_row); + auto const end_row = std::min(_input_pass_row_count[_current_input_pass + 1], global_end_row); // skip_rows is always global in the sense that it is relative to the first row of // everything we will be reading, regardless of what pass we are on. // num_rows is how many rows we are reading this pass. - // _pass_itm_data->skip_rows = start_row - _input_pass_row_count[row_group_start]; - _pass_itm_data->skip_rows = global_start_row + _input_pass_row_count[row_group_start]; + _pass_itm_data->skip_rows = global_start_row + _input_pass_row_count[_current_input_pass]; _pass_itm_data->num_rows = end_row - start_row; - - int whee = 10; - whee++; } } diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp index 9815304b965..744c1d94527 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cpp +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -100,11 +100,13 @@ auto write_file(std::vector>& input_columns, return std::pair{std::move(input_table), std::move(filepath)}; } -auto chunked_read(std::string const& filepath, std::size_t byte_limit) +auto chunked_read(std::string const& filepath, + std::size_t output_limit, + std::size_t input_limit = 0) { auto const read_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build(); - auto reader = cudf::io::chunked_parquet_reader(byte_limit, read_opts); + auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts); auto num_chunks = 0; auto out_tables = std::vector>{}; @@ -950,3 +952,62 @@ TEST_F(ParquetChunkedReaderTest, TestChunkedReadNullCount) EXPECT_EQ(reader.read_chunk().tbl->get_column(0).null_count(), page_limit_rows / 4); } while (reader.has_next()); } + +TEST_F(ParquetChunkedReaderTest, InputLimitSimple) +{ + auto const filepath = temp_env->get_temp_filepath("input_limit_10_rowgroups.parquet"); + + // This results in 10 grow groups, at 4001150 bytes per row group + constexpr int num_rows = 25'000'000; + auto value_iter = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i; }); + cudf::test::fixed_width_column_wrapper expected(value_iter, value_iter + num_rows); + cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder( + cudf::io::sink_info{filepath}, cudf::table_view{{expected}}); + cudf::io::write_parquet(opts); + + // Note: some of these tests make explicit assumptions that the compressed size of the data in + // each row group will be 4001150. Changes to compression or other defaults may cause them to + // break (just requiring some tweaks). + + { + // no chunking + auto const [result, num_chunks] = chunked_read(filepath, 0, 0); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 1); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 4000000); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 25 chunks of 100k rows each + auto const [result, num_chunks] = chunked_read(filepath, 0, 4100000); + EXPECT_EQ(num_chunks, 25); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 12 chunks of 200k rows each, plus 1 final chunk of 100k rows. + auto const [result, num_chunks] = chunked_read(filepath, 0, 8002301); + EXPECT_EQ(num_chunks, 13); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } + + { + // 1 big chunk + auto const [result, num_chunks] = chunked_read(filepath, 0, size_t{1} * 1024 * 1024 * 1024); + EXPECT_EQ(num_chunks, 1); + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->get_column(0)); + } +} From 2ecec0f9a934e986d62e3cb32cc14a1f2ae42520 Mon Sep 17 00:00:00 2001 From: db Date: Tue, 26 Sep 2023 13:44:41 -0500 Subject: [PATCH 3/5] Doc updates. Fixed a couple places where we were inadvertently copying a vector instead of referencing it. --- cpp/src/io/parquet/parquet_gpu.hpp | 4 ++-- cpp/src/io/parquet/reader_impl.hpp | 2 ++ cpp/src/io/parquet/reader_impl_preprocess.cu | 13 ++++++------- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 56c8db2de2c..01355817fa7 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -310,13 +310,13 @@ struct ColumnChunkDesc { */ struct row_group_info { size_type index; - size_t start_row; // TODO source index + size_t start_row; size_type source_index; row_group_info() = default; row_group_info(size_type index, size_t start_row, size_type source_index) - : index(index), start_row(start_row), source_index(source_index) + : index{index}, start_row{start_row}, source_index{source_index} { } }; diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 0337b64a4ac..7fdd7e4c1d4 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -95,6 +95,8 @@ class reader::impl { * * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit + * @param pass_read_limit Limit on memory usage for the purposes of decompression and processing + * of input, or `0` if there is no limit. * @param sources Dataset sources * @param options Settings for controlling reading behavior * @param stream CUDA stream used for device memory operations and kernel launches diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 44d1227b67a..e65403b1a19 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -726,8 +726,8 @@ void reader::impl::allocate_level_decode_space() std::pair>> reader::impl::read_and_decompress_column_chunks() { - auto row_groups_info = _pass_itm_data->row_groups; - auto const num_rows = _pass_itm_data->num_rows; + auto const& row_groups_info = _pass_itm_data->row_groups; + auto const num_rows = _pass_itm_data->num_rows; auto& raw_page_data = _pass_itm_data->raw_page_data; auto& chunks = _pass_itm_data->chunks; @@ -795,9 +795,9 @@ std::pair>> reader::impl::read_and_decompres void reader::impl::load_global_chunk_info() { - auto const num_rows = _file_itm_data.global_num_rows; - auto row_groups_info = _file_itm_data.row_groups; - auto& chunks = _file_itm_data.chunks; + auto const num_rows = _file_itm_data.global_num_rows; + auto const& row_groups_info = _file_itm_data.row_groups; + auto& chunks = _file_itm_data.chunks; // Descriptors for all the chunks that make up the selected columns auto const num_input_columns = _input_columns.size(); @@ -853,7 +853,7 @@ void reader::impl::compute_input_pass_row_group_info() { // at this point, row_groups has already been filtered down to just the row groups we need to // handle optional skip_rows/num_rows parameters. - auto row_groups_info = _file_itm_data.row_groups; + auto const& row_groups_info = _file_itm_data.row_groups; // if the user hasn't specified an input size limit, read everything in a single pass. if (_input_pass_read_limit == 0) { @@ -872,7 +872,6 @@ void reader::impl::compute_input_pass_row_group_info() _input_pass_row_group_indices.push_back(0); _input_pass_row_count.push_back(0); - // for (auto const& rg : row_groups_info) { for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { auto const& rgi = row_groups_info[cur_rg_index]; auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); From d37e6a71288650e6986be66ba5270f4a0b31d15c Mon Sep 17 00:00:00 2001 From: db Date: Wed, 27 Sep 2023 11:16:43 -0500 Subject: [PATCH 4/5] PR review changes. Primarily doc updates. Also removed the change to hostdevice_vector as it was unnecessary. --- cpp/include/cudf/io/detail/parquet.hpp | 30 ++++++++++++++++++-- cpp/include/cudf/io/parquet.hpp | 4 +-- cpp/src/io/parquet/parquet_gpu.hpp | 8 +++--- cpp/src/io/parquet/reader_impl.cpp | 4 +-- cpp/src/io/parquet/reader_impl.hpp | 4 +-- cpp/src/io/parquet/reader_impl_preprocess.cu | 27 +++++++++--------- cpp/src/io/utilities/hostdevice_vector.hpp | 8 ------ 7 files changed, 50 insertions(+), 35 deletions(-) diff --git a/cpp/include/cudf/io/detail/parquet.hpp b/cpp/include/cudf/io/detail/parquet.hpp index 152e28b9a14..074f690d2c7 100644 --- a/cpp/include/cudf/io/detail/parquet.hpp +++ b/cpp/include/cudf/io/detail/parquet.hpp @@ -107,13 +107,37 @@ class chunked_reader : private reader { * temporary memory size limit) a call to `read_chunk()` will read the whole file and return a * table containing all rows. * + * The chunk_read_limit parameter controls the size of the output chunks produces. If the user + * specifies 100 MB of data, the reader will attempt to return chunks containing tables that have + * a total bytes size (over all columns) of 100 MB or less. This is a soft limit and the code + * will not fail if it cannot satisfy the limit. It will make a best-effort atttempt only. + * + * The pass_read_limit parameter controls how much temporary memory is used in the process of + * decoding the file. The primary contributor to this memory usage is the uncompressed size of + * the data read out of the file and the decompressed (but not yet decoded) size of the data. The + * granularity of a given pass is at the row group level. It will not attempt to read at the sub + * row-group level. + * + * Combined, the way to visualize passes and chunks is as follows: + * + * @code{.pseudo} + * for(each pass){ + * for(each output chunk within a pass){ + * return a table that fits within the output chunk limit + * } + * } + * @endcode + * + * With a pass_read_limit of `0` you are simply saying you have one pass that reads the entire + * file as normal. + * * @param chunk_read_limit Limit on total number of bytes to be returned per read, - * or `0` if there is no limit + * or `0` if there is no limit * @param pass_read_limit Limit on total amount of memory used for temporary computations during - * loading, or `0` if there is no limit. + * loading, or `0` if there is no limit * @param sources Input `datasource` objects to read the dataset from * @param options Settings for controlling reading behavior - * @param stream CUDA stream used for device memory operations and kernel launches. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ explicit chunked_reader(std::size_t chunk_read_limit, diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index dd6304a43e2..deaf23d405a 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -457,9 +457,9 @@ class chunked_parquet_reader { * loaded. * * @param chunk_read_limit Limit on total number of bytes to be returned per read, - * or `0` if there is no limit + * or `0` if there is no limit * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or - * `0` if there is no limit. + * `0` if there is no limit * @param options The options used to read Parquet file * @param mr Device memory resource to use for device memory allocation */ diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 01355817fa7..bfe82d71dd6 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -309,9 +309,10 @@ struct ColumnChunkDesc { * @brief The row_group_info class */ struct row_group_info { - size_type index; + size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is + // called with index and source_index size_t start_row; - size_type source_index; + size_type source_index; // file index. row_group_info() = default; @@ -348,8 +349,7 @@ struct chunk_read_info { }; /** - * @brief Struct to store pass-level data that remains constant for - * a single pass. + * @brief Struct to store pass-level data that remains constant for a single pass. */ struct pass_intermediate_data { std::vector> raw_page_data; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8c52d92d720..c7f0d20ba9c 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -353,7 +353,7 @@ void reader::impl::prepare_data(int64_t skip_rows, // if we have to start a new pass, do that now if (!_pass_preprocessed) { - auto const num_passes = _input_pass_row_group_indices.size() - 1; + auto const num_passes = _input_pass_row_group_offsets.size() - 1; // always create the pass struct, even if we end up with no passes. // this will also cause the previous pass information to be deleted @@ -531,7 +531,7 @@ bool reader::impl::has_next() std::nullopt /*filter*/); auto const num_input_passes = - _input_pass_row_group_indices.size() == 0 ? 0 : _input_pass_row_group_indices.size() - 1; + _input_pass_row_group_offsets.size() == 0 ? 0 : _input_pass_row_group_offsets.size() - 1; return (_pass_itm_data->current_output_chunk < _pass_itm_data->output_chunk_read_info.size()) || (_current_input_pass < num_input_passes); } diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 7fdd7e4c1d4..9445e4d1648 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -288,9 +288,9 @@ class reader::impl { cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; std::unique_ptr _pass_itm_data; - // an array of indices into _file_itm_data::global_chunks. Each pair of indices represents + // an array of offsets into _file_itm_data::global_chunks. Each pair of offsets represents // the start/end of the chunks to be loaded for a given pass. - std::vector _input_pass_row_group_indices{}; + std::vector _input_pass_row_group_offsets{}; std::vector _input_pass_row_count{}; std::size_t _current_input_pass{0}; std::size_t _chunk_count{0}; diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index e65403b1a19..59a5785e80d 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -757,7 +757,7 @@ std::pair>> reader::impl::read_and_decompres // generate ColumnChunkDesc objects for everything to be decoded (all input columns) for (size_t i = 0; i < num_input_columns; ++i) { - auto col = _input_columns[i]; + auto const& col = _input_columns[i]; // look up metadata auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx); @@ -857,8 +857,8 @@ void reader::impl::compute_input_pass_row_group_info() // if the user hasn't specified an input size limit, read everything in a single pass. if (_input_pass_read_limit == 0) { - _input_pass_row_group_indices.push_back(0); - _input_pass_row_group_indices.push_back(row_groups_info.size()); + _input_pass_row_group_offsets.push_back(0); + _input_pass_row_group_offsets.push_back(row_groups_info.size()); return; } @@ -869,25 +869,25 @@ void reader::impl::compute_input_pass_row_group_info() std::size_t cur_read = 0; std::size_t cur_rg_start = 0; std::size_t cur_row_count = 0; - _input_pass_row_group_indices.push_back(0); + _input_pass_row_group_offsets.push_back(0); _input_pass_row_count.push_back(0); for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { auto const& rgi = row_groups_info[cur_rg_index]; auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); - // can we add this guy + // can we add this row group if (cur_read + row_group.total_byte_size >= read_limit) { // always need to add at least 1 row group, so add ourselves if (cur_rg_start == cur_rg_index) { - _input_pass_row_group_indices.push_back(cur_rg_index + 1); + _input_pass_row_group_offsets.push_back(cur_rg_index + 1); _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); cur_rg_start = cur_rg_index + 1; cur_read = 0; } // add the previous group else { - _input_pass_row_group_indices.push_back(cur_rg_index); + _input_pass_row_group_offsets.push_back(cur_rg_index); _input_pass_row_count.push_back(cur_row_count); cur_rg_start = cur_rg_index; cur_read = row_group.total_byte_size; @@ -898,8 +898,8 @@ void reader::impl::compute_input_pass_row_group_info() cur_row_count += row_group.num_rows; } // add the last pass if necessary - if (_input_pass_row_group_indices.back() != row_groups_info.size()) { - _input_pass_row_group_indices.push_back(row_groups_info.size()); + if (_input_pass_row_group_offsets.back() != row_groups_info.size()) { + _input_pass_row_group_offsets.push_back(row_groups_info.size()); _input_pass_row_count.push_back(cur_row_count); } } @@ -910,15 +910,15 @@ void reader::impl::setup_pass() _pass_itm_data = std::make_unique(); // setup row groups to be loaded for this pass - auto const row_group_start = _input_pass_row_group_indices[_current_input_pass]; - auto const row_group_end = _input_pass_row_group_indices[_current_input_pass + 1]; + auto const row_group_start = _input_pass_row_group_offsets[_current_input_pass]; + auto const row_group_end = _input_pass_row_group_offsets[_current_input_pass + 1]; auto const num_row_groups = row_group_end - row_group_start; _pass_itm_data->row_groups.resize(num_row_groups); std::copy(_file_itm_data.row_groups.begin() + row_group_start, _file_itm_data.row_groups.begin() + row_group_end, _pass_itm_data->row_groups.begin()); - auto const num_passes = _input_pass_row_group_indices.size() - 1; + auto const num_passes = _input_pass_row_group_offsets.size() - 1; CUDF_EXPECTS(_current_input_pass < num_passes, "Encountered an invalid read pass index"); auto const chunks_per_rowgroup = _input_columns.size(); @@ -928,8 +928,7 @@ void reader::impl::setup_pass() auto chunk_end = _file_itm_data.chunks.begin() + (row_group_end * chunks_per_rowgroup); _pass_itm_data->chunks = - cudf::detail::hostdevice_vector(0, num_chunks, _stream); - _pass_itm_data->chunks.resize(num_chunks, _stream); + cudf::detail::hostdevice_vector(num_chunks, _stream); std::copy(chunk_start, chunk_end, _pass_itm_data->chunks.begin()); // adjust skip_rows and num_rows by what's available in the row groups we are processing diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 38f18daa80e..a6a93c41472 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -96,14 +96,6 @@ class hostdevice_vector { [[nodiscard]] size_t size_bytes() const noexcept { return sizeof(T) * size(); } [[nodiscard]] bool empty() const noexcept { return size() == 0; } - void resize(size_t new_size, rmm::cuda_stream_view stream) - { - CUDF_EXPECTS(new_size <= capacity(), - "hostdevice_vector resize must fit within existing capacity"); - stream.synchronize(); - current_size = new_size; - } - [[nodiscard]] T& operator[](size_t i) { return host_data[i]; } [[nodiscard]] T const& operator[](size_t i) const { return host_data[i]; } From 167e664b01b0c0cbfb6406a99860065b8e306681 Mon Sep 17 00:00:00 2001 From: db Date: Wed, 27 Sep 2023 17:48:32 -0500 Subject: [PATCH 5/5] PR review changes. Explicitly force the writer to not use dictionary encoding to keep the hardcoded uncompressed size predictable. --- cpp/src/io/parquet/parquet_gpu.hpp | 4 ++-- cpp/src/io/parquet/reader_impl_preprocess.cu | 24 +++++++++++--------- cpp/tests/io/parquet_chunked_reader_test.cpp | 15 +++++++----- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 4e7b4cd68d8..572fe5c2820 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -369,8 +369,8 @@ struct pass_intermediate_data { std::vector output_chunk_read_info; std::size_t current_output_chunk{0}; - rmm::device_buffer level_decode_data; - int level_type_size; + rmm::device_buffer level_decode_data{}; + int level_type_size{0}; // skip_rows and num_rows values for this particular pass. these may be adjusted values from the // global values stored in file_intermediate_data. diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 59a5785e80d..c731c467f2c 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -866,9 +866,9 @@ void reader::impl::compute_input_pass_row_group_info() // std::size_t const read_limit = _input_pass_read_limit > 0 ? _input_pass_read_limit : std::numeric_limits::max(); - std::size_t cur_read = 0; - std::size_t cur_rg_start = 0; - std::size_t cur_row_count = 0; + std::size_t cur_pass_byte_size = 0; + std::size_t cur_rg_start = 0; + std::size_t cur_row_count = 0; _input_pass_row_group_offsets.push_back(0); _input_pass_row_count.push_back(0); @@ -877,23 +877,25 @@ void reader::impl::compute_input_pass_row_group_info() auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); // can we add this row group - if (cur_read + row_group.total_byte_size >= read_limit) { - // always need to add at least 1 row group, so add ourselves + if (cur_pass_byte_size + row_group.total_byte_size >= read_limit) { + // A single row group (the current one) is larger than the read limit: + // We always need to include at least one row group, so end the pass at the end of the current + // row group if (cur_rg_start == cur_rg_index) { _input_pass_row_group_offsets.push_back(cur_rg_index + 1); _input_pass_row_count.push_back(cur_row_count + row_group.num_rows); - cur_rg_start = cur_rg_index + 1; - cur_read = 0; + cur_rg_start = cur_rg_index + 1; + cur_pass_byte_size = 0; } - // add the previous group + // End the pass at the end of the previous row group else { _input_pass_row_group_offsets.push_back(cur_rg_index); _input_pass_row_count.push_back(cur_row_count); - cur_rg_start = cur_rg_index; - cur_read = row_group.total_byte_size; + cur_rg_start = cur_rg_index; + cur_pass_byte_size = row_group.total_byte_size; } } else { - cur_read += row_group.total_byte_size; + cur_pass_byte_size += row_group.total_byte_size; } cur_row_count += row_group.num_rows; } diff --git a/cpp/tests/io/parquet_chunked_reader_test.cpp b/cpp/tests/io/parquet_chunked_reader_test.cpp index 744c1d94527..05fb9a3ec48 100644 --- a/cpp/tests/io/parquet_chunked_reader_test.cpp +++ b/cpp/tests/io/parquet_chunked_reader_test.cpp @@ -961,13 +961,16 @@ TEST_F(ParquetChunkedReaderTest, InputLimitSimple) constexpr int num_rows = 25'000'000; auto value_iter = cudf::detail::make_counting_transform_iterator(0, [](int i) { return i; }); cudf::test::fixed_width_column_wrapper expected(value_iter, value_iter + num_rows); - cudf::io::parquet_writer_options opts = cudf::io::parquet_writer_options::builder( - cudf::io::sink_info{filepath}, cudf::table_view{{expected}}); - cudf::io::write_parquet(opts); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, + cudf::table_view{{expected}}) + // note: it is unnecessary to force compression to NONE here because the size we are using in + // the row group is the uncompressed data size. But forcing the dictionary policy to + // dictionary_policy::NEVER is necessary to prevent changes in the + // decompressed-but-not-yet-decoded data. + .dictionary_policy(cudf::io::dictionary_policy::NEVER); - // Note: some of these tests make explicit assumptions that the compressed size of the data in - // each row group will be 4001150. Changes to compression or other defaults may cause them to - // break (just requiring some tweaks). + cudf::io::write_parquet(opts); { // no chunking