Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report number of rows per file read by PQ reader when no row selection and fix segfault in chunked PQ reader when skip_rows > 0 #16195

Merged
merged 35 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c249f05
Calculate num rows read from each data source by PQ reader
mhaseeb123 Jul 4, 2024
3891e3a
Adding gtests for num_rows_per_source
mhaseeb123 Jul 9, 2024
4bc569e
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 9, 2024
d3863a6
Minor updates
mhaseeb123 Jul 9, 2024
794d59c
Merge branch 'report-nrows-per-source' of https://github.com/mhaseeb1…
mhaseeb123 Jul 9, 2024
ebdfad5
gtests for empty dfs and minor improvements
mhaseeb123 Jul 9, 2024
0fd6890
separate out the empty df gtest
mhaseeb123 Jul 9, 2024
a294c18
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 9, 2024
d268873
Add num_rows_per_source vector to the types.pxd for future use in pyt…
mhaseeb123 Jul 10, 2024
702b0ee
Adjust for global_skip_rows while computing num_rows_per_source
mhaseeb123 Jul 11, 2024
d031af9
Fix segfault when skip_rows > 0 and num_passes > 1 in chunked_parquet…
mhaseeb123 Jul 11, 2024
1a207e9
Handle base cases when calculating num_rows_per_source
mhaseeb123 Jul 11, 2024
78ed6d1
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 11, 2024
7ad6179
Add a couple more gtests
mhaseeb123 Jul 11, 2024
975b7c3
Revert the chunk_start_row in column_info_for_row_group the page inde…
mhaseeb123 Jul 11, 2024
e826caf
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 11, 2024
fa33f7a
Merge branch 'report-nrows-per-source' of https://github.com/mhaseeb1…
mhaseeb123 Jul 12, 2024
0ac70b2
Minor bug fix
mhaseeb123 Jul 12, 2024
bcc3bec
Remove the unreachable branch
mhaseeb123 Jul 16, 2024
363b0da
Applying suggestions
mhaseeb123 Jul 16, 2024
8c5816b
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 16, 2024
e239382
Suggestions from commits
mhaseeb123 Jul 16, 2024
6f7d203
Finally fixed the segfault when skip_rows > row group size
mhaseeb123 Jul 17, 2024
c19e972
Revert const to row_group_info as no longer needed
mhaseeb123 Jul 17, 2024
9208a80
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
513d3bb
Fix the free() invalid pointer at chunked reader destructor
mhaseeb123 Jul 17, 2024
c11b27d
Minor code cleanup
mhaseeb123 Jul 17, 2024
dd19f52
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
a641037
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 17, 2024
6ae6f07
Minor code improvements.
mhaseeb123 Jul 18, 2024
91e4735
Add helper function for include_output_num_rows_per_source
mhaseeb123 Jul 18, 2024
ed4352a
Applying suggestions from reviews
mhaseeb123 Jul 19, 2024
02b32ac
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
71e2d4d
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
1439189
Merge branch 'branch-24.08' into report-nrows-per-source
mhaseeb123 Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ struct column_name_info {
struct table_metadata {
std::vector<column_name_info>
schema_info; //!< Detailed name information for the entire output hierarchy
std::vector<size_t> num_rows_per_source; //!< Number of rows read from each data source. Empty
//!< vector if AST filters are being used.
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
std::map<std::string, std::string> user_data; //!< Format-dependent metadata of the first input
//!< file as key-values pairs (deprecated)
std::vector<std::unordered_map<std::string, std::string>>
Expand Down
87 changes: 84 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <rmm/resource_ref.hpp>

#include <thrust/binary_search.h>
#include <thrust/iterator/counting_iterator.h>

#include <bitset>
Expand Down Expand Up @@ -549,7 +550,24 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
out_columns.reserve(_output_buffers.size());

// no work to do (this can happen on the first pass if we have no rows to read)
if (!has_more_work()) { return finalize_output(out_metadata, out_columns); }
if (!has_more_work()) {
// Compute number of rows per source if no AST filters
if (not _expr_conv.get_converted_expr().has_value()) {
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
// If already empty (empty dataframe case), simply initialize to a list of zeros
if (not out_metadata.num_rows_per_source.size()) {
out_metadata.num_rows_per_source =
std::vector<size_t>(out_metadata.num_rows_per_source.size(), 0);
}
// If this is previously non-empty, simply fill in zeros
vuule marked this conversation as resolved.
Show resolved Hide resolved
else {
thrust::fill(
out_metadata.num_rows_per_source.begin(), out_metadata.num_rows_per_source.end(), 0);
}
}

// Finalize output
return finalize_output(mode, out_metadata, out_columns);
}

auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -585,11 +603,74 @@ table_with_metadata reader::impl::read_chunk_internal(read_mode mode)
}
}

// Compute number of rows per source if no AST filters
if (not _expr_conv.get_converted_expr().has_value()) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
// For chunked reading, compute the output number of rows per source
if (mode == read_mode::CHUNKED_READ) {
out_metadata.num_rows_per_source =
calculate_output_num_rows_per_source(read_info.skip_rows, read_info.num_rows);
}
// Simply move the number of rows per file if reading all at once
else {
// Move is okay here we are reading in one go.
out_metadata.num_rows_per_source = std::move(_file_itm_data.num_rows_per_source);
}
}

// Add empty columns if needed. Filter output columns based on filter.
return finalize_output(out_metadata, out_columns);
return finalize_output(mode, out_metadata, out_columns);
}

std::vector<size_t> reader::impl::calculate_output_num_rows_per_source(size_t const start_row,
size_t const num_rows)
{
std::vector<size_t> num_rows_per_source(_file_itm_data.num_rows_per_source.size(), 0);

// TODO: Modify to simply end_row = start_row + num_rows once
// https://github.com/rapidsai/cudf/issues/16186 is fixed.
auto const end_row = std::min(_file_itm_data.global_skip_rows + _file_itm_data.global_num_rows,
start_row + num_rows);
CUDF_EXPECTS(start_row < end_row and
end_row <= _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows,
"Encountered invalid output chunk row bounds.");

// Copy reference to a const local variable for better readability
auto const& partial_sum_nrows_source = _file_itm_data.exclusive_sum_num_rows_per_source;

// Binary search start_row and end_row in exclusive_sum_num_rows_per_source array
auto const start_iter = thrust::upper_bound(
partial_sum_nrows_source.cbegin(), partial_sum_nrows_source.cend(), start_row);
auto const end_iter =
(end_row == _file_itm_data.global_skip_rows + _file_itm_data.global_num_rows)
? partial_sum_nrows_source.cend() - 1
: thrust::upper_bound(start_iter, partial_sum_nrows_source.cend(), end_row);

// Compute the array offset index for both iterators
auto const start_idx = std::distance(partial_sum_nrows_source.cbegin(), start_iter);
auto const end_idx = std::distance(partial_sum_nrows_source.cbegin(), end_iter);

CUDF_EXPECTS(start_idx <= end_idx,
"Encountered invalid source files indexes for output chunk row bounds");

// If the entire chunk is from the same source file, then the count is simply num_rows
if (start_idx == end_idx) {
num_rows_per_source[start_idx] = num_rows;
} else {
// Compute the number of rows from the first source file
num_rows_per_source[start_idx] = partial_sum_nrows_source[start_idx] - start_row;
// Compute the number of rows from the last source file
num_rows_per_source[end_idx] = end_row - partial_sum_nrows_source[end_idx - 1];
// Simply copy the number of rows for each source in range: (start_idx, end_idx)
thrust::copy(_file_itm_data.num_rows_per_source.cbegin() + start_idx + 1,
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
_file_itm_data.num_rows_per_source.cbegin() + end_idx,
num_rows_per_source.begin() + start_idx + 1);
}

return num_rows_per_source;
}

table_with_metadata reader::impl::finalize_output(table_metadata& out_metadata,
table_with_metadata reader::impl::finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns)
{
// Create empty columns as needed (this can happen if we've ended up with no actual data to read)
Expand Down
21 changes: 19 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,13 @@ class reader::impl {
* @brief Finalize the output table by adding empty columns for the non-selected columns in
* schema.
*
* @param read_mode Value indicating if the data sources are read all at once or chunk by chunk
* @param out_metadata The output table metadata
* @param out_columns The columns for building the output table
* @return The output table along with columns' metadata
*/
table_with_metadata finalize_output(table_metadata& out_metadata,
table_with_metadata finalize_output(read_mode mode,
table_metadata& out_metadata,
std::vector<std::unique_ptr<column>>& out_columns);

/**
Expand Down Expand Up @@ -336,11 +338,26 @@ class reader::impl {
: true;
}

/**
* @brief Check if this is the first output chunk
*
* @return True if this is the first output chunk
*/
[[nodiscard]] bool is_first_output_chunk() const
{
return _file_itm_data._output_chunk_count == 0;
}

/**
* @brief Calculate the number of rows read from each source in the output chunk
*
* @param start_row The offset of the first row in the output chunk
* @param num_rows The number of rows in the the output chunk
* @return Vector of number of rows from each respective data source in the output chunk
*/
[[nodiscard]] std::vector<size_t> calculate_output_num_rows_per_source(size_t const start_row,
size_t const num_rows);
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

rmm::cuda_stream_view _stream;
rmm::device_async_resource_ref _mr{rmm::mr::get_current_device_resource()};

Expand Down Expand Up @@ -387,7 +404,7 @@ class reader::impl {

// chunked reading happens in 2 parts:
//
// At the top level, the entire file is divided up into "passes" omn which we try and limit the
// At the top level, the entire file is divided up into "passes" on which we try and limit the
// total amount of temporary memory (compressed data, decompressed data) in use
// via _input_pass_read_limit.
//
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ struct file_intermediate_data {
// is not capped by global_skip_rows and global_num_rows.
std::vector<std::size_t> input_pass_start_row_count{};

// number of rows to be read from each data source
std::vector<std::size_t> num_rows_per_source{};

// partial sum of the number of rows per data source
std::vector<std::size_t> exclusive_sum_num_rows_per_source{};
vuule marked this conversation as resolved.
Show resolved Hide resolved

size_t _current_input_pass{0}; // current input pass index
size_t _output_chunk_count{0}; // how many output chunks we have produced

Expand Down
25 changes: 20 additions & 5 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con
return names;
}

std::tuple<int64_t, size_type, std::vector<row_group_info>>
std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
aggregate_reader_metadata::select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t skip_rows_opt,
Expand Down Expand Up @@ -983,6 +983,9 @@ aggregate_reader_metadata::select_row_groups(
static_cast<size_type>(from_opts.second)};
}();

// Get number of rows in each data source
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0);

if (!row_group_indices.empty()) {
CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(),
"Must specify row groups for each source");
Expand All @@ -996,28 +999,40 @@ aggregate_reader_metadata::select_row_groups(
selection.emplace_back(rowgroup_idx, rows_to_read, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.
column_info_for_row_group(selection.back(), 0);
rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows;
auto const rows_this_rg = get_row_group(rowgroup_idx, src_idx).num_rows;
rows_to_read += rows_this_rg;
num_rows_per_source[src_idx] += rows_this_rg;
}
}
} else {
size_type count = 0;
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) {
auto const& fmd = per_file_metadata[src_idx];
for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) {
for (size_t rg_idx = 0;
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop will now stop as soon as count >= rows_to_read + rows_to_skip

rg_idx < fmd.row_groups.size() and count < rows_to_skip + rows_to_read;
++rg_idx) {
auto const& rg = fmd.row_groups[rg_idx];
auto const chunk_start_row = count;
count += rg.num_rows;
if (count > rows_to_skip || count == 0) {
num_rows_per_source[src_idx] += count;
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
num_rows_per_source[src_idx] -= (chunk_start_row < rows_to_skip and count > rows_to_skip)
? rows_to_skip
: chunk_start_row;

selection.emplace_back(rg_idx, chunk_start_row, src_idx);
// if page-level indexes are present, then collect extra chunk and page info.
column_info_for_row_group(selection.back(), chunk_start_row);
}
if (count >= rows_to_skip + rows_to_read) { break; }
// Adjust the number of rows for the last source file.
if (count >= rows_to_skip + rows_to_read) {
num_rows_per_source[src_idx] -= count - rows_to_skip - rows_to_read;
}
}
}
}

return {rows_to_skip, rows_to_read, std::move(selection)};
return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)};
}

std::tuple<std::vector<input_column_info>,
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,17 @@ class aggregate_reader_metadata {
* @param output_column_schemas schema indices of output columns
* @param filter Optional AST expression to filter row groups based on Column chunk statistics
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A tuple of corrected row_start, row_count and list of row group indexes and its
* starting row
* @return A tuple of corrected row_start, row_count, list of row group indexes and its
* starting row, and list of number of rows per source.
*/
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>> select_row_groups(
host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>>
select_row_groups(host_span<std::vector<size_type> const> row_group_indices,
int64_t row_start,
std::optional<size_type> const& row_count,
host_span<data_type const> output_dtypes,
host_span<int const> output_column_schemas,
std::optional<std::reference_wrapper<ast::expression const>> filter,
rmm::cuda_stream_view stream) const;

/**
* @brief Filters and reduces down to a selection of columns
Expand Down
19 changes: 15 additions & 4 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1235,8 +1235,10 @@ void reader::impl::preprocess_file(read_mode mode)
[](auto const& col) { return col.type; });
}

std::tie(
_file_itm_data.global_skip_rows, _file_itm_data.global_num_rows, _file_itm_data.row_groups) =
std::tie(_file_itm_data.global_skip_rows,
_file_itm_data.global_num_rows,
_file_itm_data.row_groups,
_file_itm_data.num_rows_per_source) =
_metadata->select_row_groups(_options.row_group_indices,
_options.skip_rows,
_options.num_rows,
Expand All @@ -1245,9 +1247,18 @@ void reader::impl::preprocess_file(read_mode mode)
_expr_conv.get_converted_expr(),
_stream);

// Inclusive scan the number of rows per source
Copy link
Member Author

@mhaseeb123 mhaseeb123 Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only need to compute the vector of partial sums if read mode is chunked. The partial sums would allow us to binary search into it to easily and low-cost compute the number of rows per source for each individual chunk

if (not _expr_conv.get_converted_expr().has_value() and mode == read_mode::CHUNKED_READ) {
_file_itm_data.exclusive_sum_num_rows_per_source.resize(
_file_itm_data.num_rows_per_source.size());
thrust::inclusive_scan(_file_itm_data.num_rows_per_source.cbegin(),
_file_itm_data.num_rows_per_source.cend(),
_file_itm_data.exclusive_sum_num_rows_per_source.begin());
}

// check for page indexes
_has_page_index = std::all_of(_file_itm_data.row_groups.begin(),
_file_itm_data.row_groups.end(),
_has_page_index = std::all_of(_file_itm_data.row_groups.cbegin(),
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
_file_itm_data.row_groups.cend(),
[](auto const& row_group) { return row_group.has_page_index(); });

if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() &&
Expand Down
Loading
Loading