Skip to content

Commit

Permalink
Prevent overflow with skip_rows in ORC and Parquet readers (#13063)
Browse files Browse the repository at this point in the history
Use int64_t for `skip_rows` since source or combined sources can have more than two billion rows, and we should be able to read a range of rows even in that case.
Store `num_rows` as `std::optional`, instead of using special value (`-1`).
Reuse code with error-prone logic between ORC and Parquet.
Added unit tests for the tricky code above.
Converted inout `select_stripes` parameters to input params + return values.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Robert Maynard (https://github.com/robertmaynard)
  - https://github.com/brandon-b-miller
  - Yunsong Wang (https://github.com/PointKernel)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #13063
  • Loading branch information
vuule authored Apr 13, 2023
1 parent d415ffe commit f77403e
Show file tree
Hide file tree
Showing 20 changed files with 373 additions and 127 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ add_library(
src/io/utilities/datasource.cpp
src/io/utilities/file_io_utilities.cpp
src/io/utilities/parsing_utils.cu
src/io/utilities/row_selection.cpp
src/io/utilities/trie.cu
src/io/utilities/type_conversion.cpp
src/jit/cache.cpp
Expand Down
48 changes: 37 additions & 11 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ class orc_reader_options {

// List of individual stripes to read (ignored if empty)
std::vector<std::vector<size_type>> _stripes;
// Rows to skip from the start;
size_type _skip_rows = 0;
// Rows to read; -1 is all
size_type _num_rows = -1;
// Rows to skip from the start; ORC stores the number of rows as uint64_t
uint64_t _skip_rows = 0;
// Rows to read; `nullopt` is all
std::optional<size_type> _num_rows;

// Whether to use row index to speed-up reading
bool _use_index = true;
Expand Down Expand Up @@ -124,14 +124,15 @@ class orc_reader_options {
*
* @return Number of rows to skip from the start
*/
size_type get_skip_rows() const { return _skip_rows; }
uint64_t get_skip_rows() const { return _skip_rows; }

/**
* @brief Returns number of row to read.
*
* @return Number of row to read
* @return Number of rows to read; `nullopt` if the option hasn't been set (in which case the file
* is read until the end)
*/
size_type get_num_rows() const { return _num_rows; }
std::optional<size_type> const& get_num_rows() const { return _num_rows; }

/**
* @brief Whether to use row index to speed-up reading.
Expand Down Expand Up @@ -174,20 +175,29 @@ class orc_reader_options {
* @brief Sets list of stripes to read for each input source
*
* @param stripes Vector of vectors, mapping stripes to read to input sources
*
* @throw cudf::logic_error if a non-empty vector is passed, and `skip_rows` has been previously
* set
* @throw cudf::logic_error if a non-empty vector is passed, and `num_rows` has been previously
* set
*/
void set_stripes(std::vector<std::vector<size_type>> stripes)
{
CUDF_EXPECTS(stripes.empty() or (_skip_rows == 0), "Can't set stripes along with skip_rows");
CUDF_EXPECTS(stripes.empty() or (_num_rows == -1), "Can't set stripes along with num_rows");
CUDF_EXPECTS(stripes.empty() or not _num_rows.has_value(),
"Can't set stripes along with num_rows");
_stripes = std::move(stripes);
}

/**
* @brief Sets number of rows to skip from the start.
*
* @param rows Number of rows
*
* @throw cudf::logic_error if a negative value is passed
* @throw cudf::logic_error if stripes have been previously set
*/
void set_skip_rows(size_type rows)
void set_skip_rows(uint64_t rows)
{
CUDF_EXPECTS(rows == 0 or _stripes.empty(), "Can't set both skip_rows along with stripes");
_skip_rows = rows;
Expand All @@ -197,10 +207,14 @@ class orc_reader_options {
* @brief Sets number of row to read.
*
* @param nrows Number of rows
*
* @throw cudf::logic_error if a negative value is passed
* @throw cudf::logic_error if stripes have been previously set
*/
void set_num_rows(size_type nrows)
{
CUDF_EXPECTS(nrows == -1 or _stripes.empty(), "Can't set both num_rows along with stripes");
CUDF_EXPECTS(nrows >= 0, "num_rows cannot be negative");
CUDF_EXPECTS(_stripes.empty(), "Can't set both num_rows and stripes");
_num_rows = nrows;
}

Expand Down Expand Up @@ -287,7 +301,7 @@ class orc_reader_options_builder {
* @param rows Number of rows
* @return this for chaining
*/
orc_reader_options_builder& skip_rows(size_type rows)
orc_reader_options_builder& skip_rows(uint64_t rows)
{
options.set_skip_rows(rows);
return *this;
Expand Down Expand Up @@ -571,6 +585,8 @@ class orc_writer_options {
* @brief Sets the maximum stripe size, in bytes.
*
* @param size_bytes Maximum stripe size, in bytes to be set
*
* @throw cudf::logic_error if a value below the minimal size is passed
*/
void set_stripe_size_bytes(size_t size_bytes)
{
Expand All @@ -585,6 +601,8 @@ class orc_writer_options {
* the stripe size.
*
* @param size_rows Maximum stripe size, in rows to be set
*
* @throw cudf::logic_error if a value below the minimal number of rows is passed
*/
void set_stripe_size_rows(size_type size_rows)
{
Expand All @@ -598,6 +616,8 @@ class orc_writer_options {
* Rounded down to a multiple of 8.
*
* @param stride Row index stride to be set
*
* @throw cudf::logic_error if a value below the minimal row index stride is passed
*/
void set_row_index_stride(size_type stride)
{
Expand Down Expand Up @@ -924,6 +944,8 @@ class chunked_orc_writer_options {
* @brief Sets the maximum stripe size, in bytes.
*
* @param size_bytes Maximum stripe size, in bytes to be set
*
* @throw cudf::logic_error if a value below the minimal stripe size is passed
*/
void set_stripe_size_bytes(size_t size_bytes)
{
Expand All @@ -938,6 +960,8 @@ class chunked_orc_writer_options {
* the stripe size.
*
* @param size_rows Maximum stripe size, in rows to be set
*
* @throw cudf::logic_error if a value below the minimal number of rows in a stripe is passed
*/
void set_stripe_size_rows(size_type size_rows)
{
Expand All @@ -951,6 +975,8 @@ class chunked_orc_writer_options {
* Rounded down to a multiple of 8.
*
* @param stride Row index stride to be set
*
* @throw cudf::logic_error if a value below the minimal number of rows in a row group is passed
*/
void set_row_index_stride(size_type stride)
{
Expand Down
19 changes: 10 additions & 9 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ class parquet_reader_options {

// List of individual row groups to read (ignored if empty)
std::vector<std::vector<size_type>> _row_groups;
// Number of rows to skip from the start
size_type _skip_rows = 0;
// Number of rows to read; -1 is all
size_type _num_rows = -1;
// Number of rows to skip from the start; Parquet stores the number of rows as int64_t
int64_t _skip_rows = 0;
// Number of rows to read; `nullopt` is all
std::optional<size_type> _num_rows;

// Whether to store string data as categorical type
bool _convert_strings_to_categories = false;
Expand Down Expand Up @@ -136,14 +136,15 @@ class parquet_reader_options {
*
* @return Number of rows to skip from the start
*/
[[nodiscard]] size_type get_skip_rows() const { return _skip_rows; }
[[nodiscard]] int64_t get_skip_rows() const { return _skip_rows; }

/**
* @brief Returns number of rows to read.
*
* @return Number of rows to read
* @return Number of rows to read; `nullopt` if the option hasn't been set (in which case the file
* is read until the end)
*/
[[nodiscard]] size_type get_num_rows() const { return _num_rows; }
[[nodiscard]] std::optional<size_type> const& get_num_rows() const { return _num_rows; }

/**
* @brief Returns names of column to be read, if set.
Expand Down Expand Up @@ -210,7 +211,7 @@ class parquet_reader_options {
*
* @param val Number of rows to skip from start
*/
void set_skip_rows(size_type val);
void set_skip_rows(int64_t val);

/**
* @brief Sets number of rows to read.
Expand Down Expand Up @@ -314,7 +315,7 @@ class parquet_reader_options_builder {
* @param val Number of rows to skip from start
* @return this for chaining
*/
parquet_reader_options_builder& skip_rows(size_type val)
parquet_reader_options_builder& skip_rows(int64_t val)
{
options.set_skip_rows(val);
return *this;
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,27 +603,25 @@ std::unique_ptr<std::vector<uint8_t>> parquet_chunked_writer::close(

void parquet_reader_options::set_row_groups(std::vector<std::vector<size_type>> row_groups)
{
if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) {
if ((!row_groups.empty()) and ((_skip_rows != 0) or _num_rows.has_value())) {
CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows");
}

_row_groups = std::move(row_groups);
}

void parquet_reader_options::set_skip_rows(size_type val)
void parquet_reader_options::set_skip_rows(int64_t val)
{
if ((val != 0) and (!_row_groups.empty())) {
CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups");
}
CUDF_EXPECTS(val >= 0, "skip_rows cannot be negative");
CUDF_EXPECTS(_row_groups.empty(), "skip_rows can't be set along with a non-empty row_groups");

_skip_rows = val;
}

void parquet_reader_options::set_num_rows(size_type val)
{
if ((val != -1) and (!_row_groups.empty())) {
CUDF_FAIL("num_rows can't be set along with a non-empty row_groups");
}
CUDF_EXPECTS(val >= 0, "num_rows cannot be negative");
CUDF_EXPECTS(_row_groups.empty(), "num_rows can't be set along with a non-empty row_groups");

_num_rows = val;
}
Expand Down
52 changes: 26 additions & 26 deletions cpp/src/io/orc/aggregate_orc_metadata.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

#include "aggregate_orc_metadata.hpp"

#include <io/utilities/row_selection.hpp>

#include <algorithm>
#include <numeric>
#include <optional>
Expand Down Expand Up @@ -106,10 +108,10 @@ auto metadatas_from_sources(std::vector<std::unique_ptr<datasource>> const& sour

} // namespace

size_type aggregate_orc_metadata::calc_num_rows() const
int64_t aggregate_orc_metadata::calc_num_rows() const
{
return std::accumulate(
per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto const& sum, auto const& pfm) {
per_file_metadata.begin(), per_file_metadata.end(), 0l, [](auto const& sum, auto const& pfm) {
return sum + pfm.get_total_rows();
});
}
Expand Down Expand Up @@ -151,22 +153,29 @@ aggregate_orc_metadata::aggregate_orc_metadata(
}
}

std::vector<metadata::stripe_source_mapping> aggregate_orc_metadata::select_stripes(
std::tuple<int64_t, size_type, std::vector<metadata::stripe_source_mapping>>
aggregate_orc_metadata::select_stripes(
std::vector<std::vector<size_type>> const& user_specified_stripes,
size_type& row_start,
size_type& row_count,
int64_t skip_rows_opt,
std::optional<size_type> const& num_rows_opt,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(
(skip_rows_opt == 0 and not num_rows_opt.has_value()) or user_specified_stripes.empty(),
"Can't use both the row selection and the stripe selection");

auto [rows_to_skip, rows_to_read] = [&]() {
if (not user_specified_stripes.empty()) { return std::pair<uint64_t, size_type>{0, 0}; }
return cudf::io::detail::skip_rows_num_rows_from_options(
skip_rows_opt, num_rows_opt, get_num_rows());
}();

std::vector<metadata::stripe_source_mapping> selected_stripes_mapping;

if (!user_specified_stripes.empty()) {
CUDF_EXPECTS(user_specified_stripes.size() == per_file_metadata.size(),
"Must specify stripes for each source");
// row_start is 0 if stripes are set. If this is not true anymore, then
// row_start needs to be subtracted to get the correct row_count
CUDF_EXPECTS(row_start == 0, "Start row index should be 0");

row_count = 0;
// Each vector entry represents a source file; each nested vector represents the
// user_defined_stripes to get from that source file
for (size_t src_file_idx = 0; src_file_idx < user_specified_stripes.size(); ++src_file_idx) {
Expand All @@ -181,33 +190,24 @@ std::vector<metadata::stripe_source_mapping> aggregate_orc_metadata::select_stri
"Invalid stripe index");
stripe_infos.push_back(
std::pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr));
row_count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows;
rows_to_read += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows;
}
selected_stripes_mapping.push_back({static_cast<int>(src_file_idx), stripe_infos});
}
} else {
row_start = std::max(row_start, 0);
if (row_count < 0) {
row_count = static_cast<size_type>(
std::min<int64_t>(get_num_rows(), std::numeric_limits<size_type>::max()));
}
row_count = std::min(row_count, get_num_rows() - row_start);
CUDF_EXPECTS(row_count >= 0, "Invalid row count");
CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start");

size_type count = 0;
uint64_t count = 0;
size_type stripe_skip_rows = 0;
// Iterate all source files, each source file has corelating metadata
for (size_t src_file_idx = 0;
src_file_idx < per_file_metadata.size() && count < row_start + row_count;
src_file_idx < per_file_metadata.size() && count < rows_to_skip + rows_to_read;
++src_file_idx) {
std::vector<OrcStripeInfo> stripe_infos;

for (size_t stripe_idx = 0; stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size() &&
count < row_start + row_count;
count < rows_to_skip + rows_to_read;
++stripe_idx) {
count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows;
if (count > row_start || count == 0) {
if (count > rows_to_skip || count == 0) {
stripe_infos.push_back(
std::pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr));
} else {
Expand All @@ -218,7 +218,7 @@ std::vector<metadata::stripe_source_mapping> aggregate_orc_metadata::select_stri
selected_stripes_mapping.push_back({static_cast<int>(src_file_idx), stripe_infos});
}
// Need to remove skipped rows from the stripes which are not selected.
row_start -= stripe_skip_rows;
rows_to_skip -= stripe_skip_rows;
}

// Read each stripe's stripefooter metadata
Expand Down Expand Up @@ -246,7 +246,7 @@ std::vector<metadata::stripe_source_mapping> aggregate_orc_metadata::select_stri
}
}

return selected_stripes_mapping;
return {rows_to_skip, rows_to_read, selected_stripes_mapping};
}

column_hierarchy aggregate_orc_metadata::select_columns(
Expand Down
Loading

0 comments on commit f77403e

Please sign in to comment.