diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7d6e31d039e..1cb97b0ab69 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -395,6 +395,7 @@ add_library( src/io/utilities/datasource.cpp src/io/utilities/file_io_utilities.cpp src/io/utilities/parsing_utils.cu + src/io/utilities/row_selection.cpp src/io/utilities/trie.cu src/io/utilities/type_conversion.cpp src/jit/cache.cpp diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index 9dac915118f..e3abbe6056f 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -57,10 +57,10 @@ class orc_reader_options { // List of individual stripes to read (ignored if empty) std::vector> _stripes; - // Rows to skip from the start; - size_type _skip_rows = 0; - // Rows to read; -1 is all - size_type _num_rows = -1; + // Rows to skip from the start; ORC stores the number of rows as uint64_t + uint64_t _skip_rows = 0; + // Rows to read; `nullopt` is all + std::optional _num_rows; // Whether to use row index to speed-up reading bool _use_index = true; @@ -124,14 +124,15 @@ class orc_reader_options { * * @return Number of rows to skip from the start */ - size_type get_skip_rows() const { return _skip_rows; } + uint64_t get_skip_rows() const { return _skip_rows; } /** * @brief Returns number of row to read. * - * @return Number of row to read + * @return Number of rows to read; `nullopt` if the option hasn't been set (in which case the file + * is read until the end) */ - size_type get_num_rows() const { return _num_rows; } + std::optional const& get_num_rows() const { return _num_rows; } /** * @brief Whether to use row index to speed-up reading. @@ -174,11 +175,17 @@ class orc_reader_options { * @brief Sets list of stripes to read for each input source * * @param stripes Vector of vectors, mapping stripes to read to input sources + * + * @throw cudf::logic_error if a non-empty vector is passed, and `skip_rows` has been previously + * set + * @throw cudf::logic_error if a non-empty vector is passed, and `num_rows` has been previously + * set */ void set_stripes(std::vector> stripes) { CUDF_EXPECTS(stripes.empty() or (_skip_rows == 0), "Can't set stripes along with skip_rows"); - CUDF_EXPECTS(stripes.empty() or (_num_rows == -1), "Can't set stripes along with num_rows"); + CUDF_EXPECTS(stripes.empty() or not _num_rows.has_value(), + "Can't set stripes along with num_rows"); _stripes = std::move(stripes); } @@ -186,8 +193,11 @@ class orc_reader_options { * @brief Sets number of rows to skip from the start. * * @param rows Number of rows + * + * @throw cudf::logic_error if a negative value is passed + * @throw cudf::logic_error if stripes have been previously set */ - void set_skip_rows(size_type rows) + void set_skip_rows(uint64_t rows) { CUDF_EXPECTS(rows == 0 or _stripes.empty(), "Can't set both skip_rows along with stripes"); _skip_rows = rows; @@ -197,10 +207,14 @@ class orc_reader_options { * @brief Sets number of row to read. * * @param nrows Number of rows + * + * @throw cudf::logic_error if a negative value is passed + * @throw cudf::logic_error if stripes have been previously set */ void set_num_rows(size_type nrows) { - CUDF_EXPECTS(nrows == -1 or _stripes.empty(), "Can't set both num_rows along with stripes"); + CUDF_EXPECTS(nrows >= 0, "num_rows cannot be negative"); + CUDF_EXPECTS(_stripes.empty(), "Can't set both num_rows and stripes"); _num_rows = nrows; } @@ -287,7 +301,7 @@ class orc_reader_options_builder { * @param rows Number of rows * @return this for chaining */ - orc_reader_options_builder& skip_rows(size_type rows) + orc_reader_options_builder& skip_rows(uint64_t rows) { options.set_skip_rows(rows); return *this; @@ -571,6 +585,8 @@ class orc_writer_options { * @brief Sets the maximum stripe size, in bytes. * * @param size_bytes Maximum stripe size, in bytes to be set + * + * @throw cudf::logic_error if a value below the minimal size is passed */ void set_stripe_size_bytes(size_t size_bytes) { @@ -585,6 +601,8 @@ class orc_writer_options { * the stripe size. * * @param size_rows Maximum stripe size, in rows to be set + * + * @throw cudf::logic_error if a value below the minimal number of rows is passed */ void set_stripe_size_rows(size_type size_rows) { @@ -598,6 +616,8 @@ class orc_writer_options { * Rounded down to a multiple of 8. * * @param stride Row index stride to be set + * + * @throw cudf::logic_error if a value below the minimal row index stride is passed */ void set_row_index_stride(size_type stride) { @@ -924,6 +944,8 @@ class chunked_orc_writer_options { * @brief Sets the maximum stripe size, in bytes. * * @param size_bytes Maximum stripe size, in bytes to be set + * + * @throw cudf::logic_error if a value below the minimal stripe size is passed */ void set_stripe_size_bytes(size_t size_bytes) { @@ -938,6 +960,8 @@ class chunked_orc_writer_options { * the stripe size. * * @param size_rows Maximum stripe size, in rows to be set + * + * @throw cudf::logic_error if a value below the minimal number of rows in a stripe is passed */ void set_stripe_size_rows(size_type size_rows) { @@ -951,6 +975,8 @@ class chunked_orc_writer_options { * Rounded down to a multiple of 8. * * @param stride Row index stride to be set + * + * @throw cudf::logic_error if a value below the minimal number of rows in a row group is passed */ void set_row_index_stride(size_type stride) { diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 53ce1be817c..07d41e3b132 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -57,10 +57,10 @@ class parquet_reader_options { // List of individual row groups to read (ignored if empty) std::vector> _row_groups; - // Number of rows to skip from the start - size_type _skip_rows = 0; - // Number of rows to read; -1 is all - size_type _num_rows = -1; + // Number of rows to skip from the start; Parquet stores the number of rows as int64_t + int64_t _skip_rows = 0; + // Number of rows to read; `nullopt` is all + std::optional _num_rows; // Whether to store string data as categorical type bool _convert_strings_to_categories = false; @@ -136,14 +136,15 @@ class parquet_reader_options { * * @return Number of rows to skip from the start */ - [[nodiscard]] size_type get_skip_rows() const { return _skip_rows; } + [[nodiscard]] int64_t get_skip_rows() const { return _skip_rows; } /** * @brief Returns number of rows to read. * - * @return Number of rows to read + * @return Number of rows to read; `nullopt` if the option hasn't been set (in which case the file + * is read until the end) */ - [[nodiscard]] size_type get_num_rows() const { return _num_rows; } + [[nodiscard]] std::optional const& get_num_rows() const { return _num_rows; } /** * @brief Returns names of column to be read, if set. @@ -210,7 +211,7 @@ class parquet_reader_options { * * @param val Number of rows to skip from start */ - void set_skip_rows(size_type val); + void set_skip_rows(int64_t val); /** * @brief Sets number of rows to read. @@ -314,7 +315,7 @@ class parquet_reader_options_builder { * @param val Number of rows to skip from start * @return this for chaining */ - parquet_reader_options_builder& skip_rows(size_type val) + parquet_reader_options_builder& skip_rows(int64_t val) { options.set_skip_rows(val); return *this; diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 8f6903e558d..76c50d548f9 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -603,27 +603,25 @@ std::unique_ptr> parquet_chunked_writer::close( void parquet_reader_options::set_row_groups(std::vector> row_groups) { - if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) { + if ((!row_groups.empty()) and ((_skip_rows != 0) or _num_rows.has_value())) { CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows"); } _row_groups = std::move(row_groups); } -void parquet_reader_options::set_skip_rows(size_type val) +void parquet_reader_options::set_skip_rows(int64_t val) { - if ((val != 0) and (!_row_groups.empty())) { - CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups"); - } + CUDF_EXPECTS(val >= 0, "skip_rows cannot be negative"); + CUDF_EXPECTS(_row_groups.empty(), "skip_rows can't be set along with a non-empty row_groups"); _skip_rows = val; } void parquet_reader_options::set_num_rows(size_type val) { - if ((val != -1) and (!_row_groups.empty())) { - CUDF_FAIL("num_rows can't be set along with a non-empty row_groups"); - } + CUDF_EXPECTS(val >= 0, "num_rows cannot be negative"); + CUDF_EXPECTS(_row_groups.empty(), "num_rows can't be set along with a non-empty row_groups"); _num_rows = val; } diff --git a/cpp/src/io/orc/aggregate_orc_metadata.cpp b/cpp/src/io/orc/aggregate_orc_metadata.cpp index df3dfca5fa9..7e733a45e2f 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.cpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ #include "aggregate_orc_metadata.hpp" +#include + #include #include #include @@ -106,10 +108,10 @@ auto metadatas_from_sources(std::vector> const& sour } // namespace -size_type aggregate_orc_metadata::calc_num_rows() const +int64_t aggregate_orc_metadata::calc_num_rows() const { return std::accumulate( - per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto const& sum, auto const& pfm) { + per_file_metadata.begin(), per_file_metadata.end(), 0l, [](auto const& sum, auto const& pfm) { return sum + pfm.get_total_rows(); }); } @@ -151,22 +153,29 @@ aggregate_orc_metadata::aggregate_orc_metadata( } } -std::vector aggregate_orc_metadata::select_stripes( +std::tuple> +aggregate_orc_metadata::select_stripes( std::vector> const& user_specified_stripes, - size_type& row_start, - size_type& row_count, + int64_t skip_rows_opt, + std::optional const& num_rows_opt, rmm::cuda_stream_view stream) { + CUDF_EXPECTS( + (skip_rows_opt == 0 and not num_rows_opt.has_value()) or user_specified_stripes.empty(), + "Can't use both the row selection and the stripe selection"); + + auto [rows_to_skip, rows_to_read] = [&]() { + if (not user_specified_stripes.empty()) { return std::pair{0, 0}; } + return cudf::io::detail::skip_rows_num_rows_from_options( + skip_rows_opt, num_rows_opt, get_num_rows()); + }(); + std::vector selected_stripes_mapping; if (!user_specified_stripes.empty()) { CUDF_EXPECTS(user_specified_stripes.size() == per_file_metadata.size(), "Must specify stripes for each source"); - // row_start is 0 if stripes are set. If this is not true anymore, then - // row_start needs to be subtracted to get the correct row_count - CUDF_EXPECTS(row_start == 0, "Start row index should be 0"); - row_count = 0; // Each vector entry represents a source file; each nested vector represents the // user_defined_stripes to get from that source file for (size_t src_file_idx = 0; src_file_idx < user_specified_stripes.size(); ++src_file_idx) { @@ -181,33 +190,24 @@ std::vector aggregate_orc_metadata::select_stri "Invalid stripe index"); stripe_infos.push_back( std::pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); - row_count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; + rows_to_read += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; } selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); } } else { - row_start = std::max(row_start, 0); - if (row_count < 0) { - row_count = static_cast( - std::min(get_num_rows(), std::numeric_limits::max())); - } - row_count = std::min(row_count, get_num_rows() - row_start); - CUDF_EXPECTS(row_count >= 0, "Invalid row count"); - CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); - - size_type count = 0; + uint64_t count = 0; size_type stripe_skip_rows = 0; // Iterate all source files, each source file has corelating metadata for (size_t src_file_idx = 0; - src_file_idx < per_file_metadata.size() && count < row_start + row_count; + src_file_idx < per_file_metadata.size() && count < rows_to_skip + rows_to_read; ++src_file_idx) { std::vector stripe_infos; for (size_t stripe_idx = 0; stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size() && - count < row_start + row_count; + count < rows_to_skip + rows_to_read; ++stripe_idx) { count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; - if (count > row_start || count == 0) { + if (count > rows_to_skip || count == 0) { stripe_infos.push_back( std::pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); } else { @@ -218,7 +218,7 @@ std::vector aggregate_orc_metadata::select_stri selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); } // Need to remove skipped rows from the stripes which are not selected. - row_start -= stripe_skip_rows; + rows_to_skip -= stripe_skip_rows; } // Read each stripe's stripefooter metadata @@ -246,7 +246,7 @@ std::vector aggregate_orc_metadata::select_stri } } - return selected_stripes_mapping; + return {rows_to_skip, rows_to_read, selected_stripes_mapping}; } column_hierarchy aggregate_orc_metadata::select_columns( diff --git a/cpp/src/io/orc/aggregate_orc_metadata.hpp b/cpp/src/io/orc/aggregate_orc_metadata.hpp index 3ce1a922f31..e22ffaeb5f8 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.hpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ class aggregate_orc_metadata { /** * @brief Sums up the number of rows of each source */ - [[nodiscard]] size_type calc_num_rows() const; + [[nodiscard]] int64_t calc_num_rows() const; /** * @brief Number of columns in a ORC file. @@ -62,7 +62,7 @@ class aggregate_orc_metadata { public: std::vector per_file_metadata; - size_type const num_rows; + int64_t const num_rows; size_type const num_stripes; bool row_grp_idx_present{true}; @@ -115,10 +115,10 @@ class aggregate_orc_metadata { * * Stripes are potentially selected from multiple files. */ - std::vector select_stripes( + std::tuple> select_stripes( std::vector> const& user_specified_stripes, - size_type& row_start, - size_type& row_count, + int64_t row_start, + std::optional const& num_rows_opt, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/orc/orc_field_reader.hpp b/cpp/src/io/orc/orc_field_reader.hpp index 2e37f008818..ccbe6553e0f 100644 --- a/cpp/src/io/orc/orc_field_reader.hpp +++ b/cpp/src/io/orc/orc_field_reader.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ /** * @file orc_field_reader.hpp * @brief Functors to encapsulate common functionality required to implement - * ProtobufWriter::read(...) functions + * ProtobufReader::read(...) functions */ namespace cudf { diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index fd3fdc74978..fbe44eff5ad 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -933,8 +933,8 @@ std::unique_ptr reader::impl::compute_timezone_table( {}, selected_stripes[0].stripe_info[0].second->writerTimezone, stream); } -table_with_metadata reader::impl::read(size_type skip_rows, - size_type num_rows, +table_with_metadata reader::impl::read(int64_t skip_rows, + std::optional num_rows, const std::vector>& stripes, rmm::cuda_stream_view stream) { @@ -956,7 +956,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, return {std::make_unique
(), std::move(out_metadata)}; // Select only stripes required (aka row groups) - const auto selected_stripes = _metadata.select_stripes(stripes, skip_rows, num_rows, stream); + auto const [rows_to_skip, rows_to_read, selected_stripes] = + _metadata.select_stripes(stripes, skip_rows, num_rows, stream); auto const tz_table = compute_timezone_table(selected_stripes, stream); @@ -994,7 +995,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, } // If no rows or stripes to read, return empty columns - if (num_rows <= 0 || selected_stripes.empty()) { + if (rows_to_read == 0 || selected_stripes.empty()) { std::transform(selected_columns.levels[0].begin(), selected_columns.levels[0].end(), std::back_inserter(out_columns), @@ -1023,11 +1024,12 @@ table_with_metadata reader::impl::read(size_type skip_rows, _metadata.is_row_grp_idx_present() && // Only use if we don't have much work with complete columns & stripes // TODO: Consider nrows, gpu, and tune the threshold - (num_rows > _metadata.get_row_index_stride() && !(_metadata.get_row_index_stride() & 7) && - _metadata.get_row_index_stride() > 0 && num_columns * total_num_stripes < 8 * 128) && + (rows_to_read > _metadata.get_row_index_stride() && + !(_metadata.get_row_index_stride() & 7) && _metadata.get_row_index_stride() > 0 && + num_columns * total_num_stripes < 8 * 128) && // Only use if first row is aligned to a stripe boundary // TODO: Fix logic to handle unaligned rows - (skip_rows == 0); + (rows_to_skip == 0); // Logically view streams as columns std::vector stream_info; @@ -1126,7 +1128,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, (level == 0) ? stripe_info->numberOfRows : _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx]; - chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx]; + chunk.column_num_rows = (level == 0) ? rows_to_read : _col_meta.num_child_rows[col_idx]; chunk.parent_validity_info = (level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx]; chunk.parent_null_count_prefix_sums = @@ -1231,7 +1233,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, } } auto is_list_type = (column_types[i].id() == type_id::LIST); - auto n_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[i]; + auto n_rows = (level == 0) ? rows_to_read : _col_meta.num_child_rows[i]; // For list column, offset column will be always size + 1 if (is_list_type) n_rows++; out_buffers[level].emplace_back(column_types[i], n_rows, is_nullable, stream, _mr); @@ -1241,7 +1243,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, auto const tz_table_dview = table_device_view::create(tz_table->view(), stream); decode_stream_data(chunks, num_dict_entries, - skip_rows, + rows_to_skip, *tz_table_dview, row_groups, _metadata.get_row_index_stride(), diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 94b0fdc09d2..822a972db25 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -97,8 +97,8 @@ class reader::impl { * * @return The set of columns along with metadata */ - table_with_metadata read(size_type skip_rows, - size_type num_rows, + table_with_metadata read(int64_t skip_rows, + std::optional num_rows, const std::vector>& stripes, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/reader.cpp b/cpp/src/io/parquet/reader.cpp index 1321e8073d7..1d01d10b5b0 100644 --- a/cpp/src/io/parquet/reader.cpp +++ b/cpp/src/io/parquet/reader.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,8 @@ reader::~reader() = default; table_with_metadata reader::read(parquet_reader_options const& options) { // if the user has specified custom row bounds - bool const uses_custom_row_bounds = options.get_num_rows() >= 0 || options.get_skip_rows() != 0; + bool const uses_custom_row_bounds = + options.get_num_rows().has_value() || options.get_skip_rows() != 0; return _impl->read(options.get_skip_rows(), options.get_num_rows(), uses_custom_row_bounds, diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b1c4dd22c0d..d293896c204 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -236,8 +236,8 @@ reader::impl::impl(std::size_t chunk_read_limit, } } -void reader::impl::prepare_data(size_type skip_rows, - size_type num_rows, +void reader::impl::prepare_data(int64_t skip_rows, + std::optional const& num_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { @@ -332,8 +332,8 @@ table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata, return {std::make_unique
(std::move(out_columns)), std::move(out_metadata)}; } -table_with_metadata reader::impl::read(size_type skip_rows, - size_type num_rows, +table_with_metadata reader::impl::read(int64_t skip_rows, + std::optional const& num_rows, bool uses_custom_row_bounds, host_span const> row_group_indices) { @@ -354,7 +354,7 @@ table_with_metadata reader::impl::read_chunk() } prepare_data(0 /*skip_rows*/, - -1 /*num_rows, `-1` means unlimited*/, + std::nullopt /*num_rows, `nullopt` means unlimited*/, true /*uses_custom_row_bounds*/, {} /*row_group_indices, empty means read all row groups*/); return read_chunk_internal(true); @@ -363,7 +363,7 @@ table_with_metadata reader::impl::read_chunk() bool reader::impl::has_next() { prepare_data(0 /*skip_rows*/, - -1 /*num_rows, `-1` means unlimited*/, + std::nullopt /*num_rows, `nullopt` means unlimited*/, true /*uses_custom_row_bounds*/, {} /*row_group_indices, empty means read all row groups*/); return _current_read_chunk < _chunk_read_info.size(); diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 8b86412ae63..9b40610b141 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -70,8 +70,8 @@ class reader::impl { * * @return The set of columns along with metadata */ - table_with_metadata read(size_type skip_rows, - size_type num_rows, + table_with_metadata read(int64_t skip_rows, + std::optional const& num_rows, bool uses_custom_row_bounds, host_span const> row_group_indices); @@ -120,13 +120,13 @@ class reader::impl { * @brief Perform the necessary data preprocessing for parsing file later on. * * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read, or `-1` to read all rows + * @param num_rows Number of rows to read, or `std::nullopt` to read all rows * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific * bounds * @param row_group_indices Lists of row groups to read (one per source), or empty if read all */ - void prepare_data(size_type skip_rows, - size_type num_rows, + void prepare_data(int64_t skip_rows, + std::optional const& num_rows, bool uses_custom_row_bounds, host_span const> row_group_indices); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 4c2b5a324c0..a3c89c3beda 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ #include "reader_impl_helpers.hpp" +#include + #include #include @@ -227,10 +229,10 @@ aggregate_reader_metadata::collect_keyval_metadata() const return kv_maps; } -size_type aggregate_reader_metadata::calc_num_rows() const +int64_t aggregate_reader_metadata::calc_num_rows() const { return std::accumulate( - per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto& sum, auto& pfm) { + per_file_metadata.begin(), per_file_metadata.end(), 0l, [](auto& sum, auto& pfm) { return sum + pfm.num_rows; }); } @@ -336,54 +338,49 @@ std::vector aggregate_reader_metadata::get_pandas_index_names() con return names; } -std::tuple> +std::tuple> aggregate_reader_metadata::select_row_groups( host_span const> row_group_indices, - size_type row_start, - size_type row_count) const + int64_t skip_rows_opt, + std::optional const& num_rows_opt) const { std::vector selection; + auto [rows_to_skip, rows_to_read] = [&]() { + if (not row_group_indices.empty()) { return std::pair{}; } + auto const from_opts = cudf::io::detail::skip_rows_num_rows_from_options( + skip_rows_opt, num_rows_opt, get_num_rows()); + return std::pair{static_cast(from_opts.first), from_opts.second}; + }(); if (!row_group_indices.empty()) { CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(), "Must specify row groups for each source"); - row_count = 0; for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { for (auto const& rowgroup_idx : row_group_indices[src_idx]) { CUDF_EXPECTS( rowgroup_idx >= 0 && rowgroup_idx < static_cast(per_file_metadata[src_idx].row_groups.size()), "Invalid rowgroup index"); - selection.emplace_back(rowgroup_idx, row_count, src_idx); - row_count += get_row_group(rowgroup_idx, src_idx).num_rows; + selection.emplace_back(rowgroup_idx, rows_to_read, src_idx); + rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows; } } - - return {row_start, row_count, std::move(selection)}; - } - - row_start = std::max(row_start, 0); - if (row_count < 0) { - row_count = std::min(get_num_rows(), std::numeric_limits::max()); - } - row_count = std::min(row_count, get_num_rows() - row_start); - CUDF_EXPECTS(row_count >= 0, "Invalid row count"); - CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); - - size_type count = 0; - for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { - for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { - auto const chunk_start_row = count; - count += get_row_group(rg_idx, src_idx).num_rows; - if (count > row_start || count == 0) { - selection.emplace_back(rg_idx, chunk_start_row, src_idx); + } else { + size_type count = 0; + for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { + for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { + auto const chunk_start_row = count; + count += get_row_group(rg_idx, src_idx).num_rows; + if (count > rows_to_skip || count == 0) { + selection.emplace_back(rg_idx, chunk_start_row, src_idx); + } + if (count >= rows_to_skip + rows_to_read) { break; } } - if (count >= row_start + row_count) { break; } } } - return {row_start, row_count, std::move(selection)}; + return {rows_to_skip, rows_to_read, std::move(selection)}; } std::tuple, std::vector, std::vector> diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 6fa86a77e46..cdc5896803c 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ struct metadata : public FileMetaData { class aggregate_reader_metadata { std::vector per_file_metadata; std::vector> keyval_maps; - size_type num_rows; + int64_t num_rows; size_type num_row_groups; /** @@ -88,7 +88,7 @@ class aggregate_reader_metadata { /** * @brief Sums up the number of rows of each source */ - [[nodiscard]] size_type calc_num_rows() const; + [[nodiscard]] int64_t calc_num_rows() const; /** * @brief Sums up the number of row groups of each source @@ -169,10 +169,10 @@ class aggregate_reader_metadata { * @return A tuple of corrected row_start, row_count and list of row group indexes and its * starting row */ - [[nodiscard]] std::tuple> select_row_groups( + [[nodiscard]] std::tuple> select_row_groups( host_span const> row_group_indices, - size_type row_start, - size_type row_count) const; + int64_t row_start, + std::optional const& row_count) const; /** * @brief Filters and reduces down to a selection of columns diff --git a/cpp/src/io/utilities/row_selection.cpp b/cpp/src/io/utilities/row_selection.cpp new file mode 100644 index 00000000000..1b79a59aa9e --- /dev/null +++ b/cpp/src/io/utilities/row_selection.cpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include +#include + +namespace cudf::io::detail { + +std::pair skip_rows_num_rows_from_options( + uint64_t skip_rows_opt, std::optional const& num_rows_opt, uint64_t num_source_rows) +{ + auto const rows_to_skip = std::min(skip_rows_opt, num_source_rows); + if (not num_rows_opt.has_value()) { + CUDF_EXPECTS(num_source_rows - rows_to_skip <= std::numeric_limits::max(), + "The requested number of rows to read exceeds the largest cudf column size"); + return {rows_to_skip, num_source_rows - rows_to_skip}; + } + // Limit the number of rows to the end of the input + return {rows_to_skip, + static_cast( + std::min(num_rows_opt.value(), num_source_rows - rows_to_skip))}; +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/utilities/row_selection.hpp b/cpp/src/io/utilities/row_selection.hpp new file mode 100644 index 00000000000..66a3a83a61e --- /dev/null +++ b/cpp/src/io/utilities/row_selection.hpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +#include +#include +#include + +namespace cudf::io::detail { + +/** + * @brief Adjusts the input skip_rows and num_rows options to the actual number of rows to + * skip/read, based on the number of rows in the ORC file(s). + * + * @param skip_rows_opt skip_rows as passed by the user + * @param num_rows_opt num_rows as passed by the user + * @param num_source_rows number of rows in the ORC file(s) + * @return A std::pair containing the number of rows to skip and the number of rows to read + * + * @throw cudf::logic_error when the requested number of rows to read exceeds the largest cudf + * column size + */ +std::pair skip_rows_num_rows_from_options( + uint64_t skip_rows_opt, std::optional const& num_rows_opt, uint64_t num_source_rows); + +} // namespace cudf::io::detail diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 04cf3f838b5..4a5a12c5ccc 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -248,6 +248,7 @@ ConfigureTest( # ################################################################################################## # * io tests -------------------------------------------------------------------------------------- ConfigureTest(DECOMPRESSION_TEST io/comp/decomp_test.cpp) +ConfigureTest(ROW_SELECTION_TEST io/row_selection_test.cpp) ConfigureTest( CSV_TEST io/csv_test.cpp diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 8a16fd9a05a..ce39fd2354f 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2603,9 +2603,7 @@ TEST_F(ParquetReaderTest, UserBounds) EXPECT_EQ(result.tbl->view().column(0).size(), 0); } - // trying to read 0 rows should result in reading the whole file - // at the moment we get back 4. when that bug gets fixed, this - // test can be flipped. + // trying to read 0 rows should result in empty columns { srand(31337); auto expected = create_random_fixed_table(4, 4, false); diff --git a/cpp/tests/io/row_selection_test.cpp b/cpp/tests/io/row_selection_test.cpp new file mode 100644 index 00000000000..243bcf4ee76 --- /dev/null +++ b/cpp/tests/io/row_selection_test.cpp @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include + +using cudf::io::detail::skip_rows_num_rows_from_options; + +// Base test fixture for tests +struct FromOptsTest : public cudf::test::BaseFixture { +}; + +TEST_F(FromOptsTest, PassThrough) +{ + // select all rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(0, 100, 100); + EXPECT_EQ(out_skip, 0); + EXPECT_EQ(out_num, 100); + } + + // select all except first skip_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(10, 90, 100); + EXPECT_EQ(out_skip, 10); + EXPECT_EQ(out_num, 90); + } + + // select first num_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(0, 60, 100); + EXPECT_EQ(out_skip, 0); + EXPECT_EQ(out_num, 60); + } +} + +TEST_F(FromOptsTest, DefaultNumRows) +{ + // no skip_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(0, std::nullopt, 100); + EXPECT_EQ(out_skip, 0); + EXPECT_EQ(out_num, 100); + } + + // with skip_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(20, std::nullopt, 100); + EXPECT_EQ(out_skip, 20); + EXPECT_EQ(out_num, 80); + } +} + +TEST_F(FromOptsTest, InputSize32BitOverflow) +{ + // Input number of rows too large to fit into cudf::size_type + // Test that we can still select rows from such input + auto const too_large_for_32bit = std::numeric_limits::max(); + + // no num_rows + { + auto [out_skip, out_num] = + skip_rows_num_rows_from_options(too_large_for_32bit - 10, std::nullopt, too_large_for_32bit); + EXPECT_EQ(out_skip, too_large_for_32bit - 10); + EXPECT_EQ(out_num, 10); + } + + // with num_rows + { + auto [out_skip, out_num] = + skip_rows_num_rows_from_options(too_large_for_32bit - 100, 30, too_large_for_32bit); + EXPECT_EQ(out_skip, too_large_for_32bit - 100); + EXPECT_EQ(out_num, 30); + } +} + +TEST_F(FromOptsTest, LimitOptionsToFileRows) +{ + // limit skip_rows without num_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(1000, std::nullopt, 100); + EXPECT_EQ(out_skip, 100); + EXPECT_EQ(out_num, 0); + } + + // limit skip_rows with num_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(1000, 2, 100); + EXPECT_EQ(out_skip, 100); + EXPECT_EQ(out_num, 0); + } + + // limit num_rows without skip_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(0, 1000, 100); + EXPECT_EQ(out_skip, 0); + EXPECT_EQ(out_num, 100); + } + + // limit num_rows with skip_rows + { + auto [out_skip, out_num] = skip_rows_num_rows_from_options(10, 1000, 100); + EXPECT_EQ(out_skip, 10); + EXPECT_EQ(out_num, 90); + } +} + +TEST_F(FromOptsTest, OverFlowDetection) +{ + auto const too_large_for_32bit = std::numeric_limits::max(); + + // Too many rows to read until the end of the file + EXPECT_THROW(skip_rows_num_rows_from_options(0, std::nullopt, too_large_for_32bit), + cudf::logic_error); + + // Should work fine with num_rows + EXPECT_NO_THROW( + skip_rows_num_rows_from_options(1000, too_large_for_32bit - 100, too_large_for_32bit)); +} + +CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 281b2cabc52..f4159f078b9 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -341,11 +341,12 @@ cdef orc_reader_options make_orc_reader_options( orc_reader_options.builder(src) .stripes(strps) .skip_rows(skip_rows) - .num_rows(num_rows) .timestamp_type(data_type(timestamp_type)) .use_index(use_index) .build() ) + if num_rows >= 0: + opts.set_num_rows(num_rows) cdef vector[string] c_column_names if column_names is not None: