-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Report number of rows per file read by PQ reader when no row selection and fix segfault in chunked PQ reader when skip_rows > 0 #16195
Changes from 31 commits
c249f05
3891e3a
4bc569e
d3863a6
794d59c
ebdfad5
0fd6890
a294c18
d268873
702b0ee
d031af9
1a207e9
78ed6d1
7ad6179
975b7c3
e826caf
fa33f7a
0ac70b2
bcc3bec
363b0da
8c5816b
e239382
6f7d203
c19e972
9208a80
513d3bb
c11b27d
dd19f52
a641037
6ae6f07
91e4735
ed4352a
02b32ac
71e2d4d
1439189
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1232,22 +1232,17 @@ void reader::impl::setup_next_pass(read_mode mode) | |
pass.skip_rows = _file_itm_data.global_skip_rows; | ||
pass.num_rows = _file_itm_data.global_num_rows; | ||
} else { | ||
auto const global_start_row = _file_itm_data.global_skip_rows; | ||
auto const global_end_row = global_start_row + _file_itm_data.global_num_rows; | ||
auto const start_row = | ||
std::max(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass], | ||
global_start_row); | ||
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All diff in this file only fixes the arithmetic that leads to segfault for |
||
auto const end_row = | ||
std::min(_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass + 1], | ||
global_end_row); | ||
_file_itm_data.global_num_rows); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how come we don't need to account for skip_rows here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really good question: This is because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope this makes sense lol. Here's an example. Say
|
||
|
||
// skip_rows is always global in the sense that it is relative to the first row of | ||
// everything we will be reading, regardless of what pass we are on. | ||
// num_rows is how many rows we are reading this pass. | ||
pass.skip_rows = | ||
global_start_row + | ||
_file_itm_data.input_pass_start_row_count[_file_itm_data._current_input_pass]; | ||
pass.num_rows = end_row - start_row; | ||
pass.skip_rows = _file_itm_data.global_skip_rows + start_row; | ||
pass.num_rows = end_row - start_row; | ||
} | ||
|
||
// load page information for the chunk. this retrieves the compressed bytes for all the | ||
|
@@ -1509,6 +1504,7 @@ void reader::impl::create_global_chunk_info() | |
|
||
// Initialize column chunk information | ||
auto remaining_rows = num_rows; | ||
auto skip_rows = _file_itm_data.global_skip_rows; | ||
for (auto const& rg : row_groups_info) { | ||
auto const& row_group = _metadata->get_row_group(rg.index, rg.source_index); | ||
auto const row_group_start = rg.start_row; | ||
|
@@ -1561,7 +1557,12 @@ void reader::impl::create_global_chunk_info() | |
schema.type == BYTE_ARRAY and _strings_to_categorical)); | ||
} | ||
|
||
remaining_rows -= row_group_rows; | ||
// Adjust for skip_rows when updating the remaining rows after the first group | ||
remaining_rows -= | ||
(skip_rows) ? std::min<int>(rg.start_row + row_group.num_rows - skip_rows, remaining_rows) | ||
: row_group_rows; | ||
// Set skip_rows = 0 as it is no longer needed for subsequent row_groups | ||
skip_rows = 0; | ||
} | ||
} | ||
|
||
|
@@ -1598,6 +1599,9 @@ void reader::impl::compute_input_passes() | |
_file_itm_data.input_pass_row_group_offsets.push_back(0); | ||
_file_itm_data.input_pass_start_row_count.push_back(0); | ||
|
||
// To handle global_skip_rows when computing input passes | ||
int skip_rows = _file_itm_data.global_skip_rows; | ||
|
||
for (size_t cur_rg_index = 0; cur_rg_index < row_groups_info.size(); cur_rg_index++) { | ||
auto const& rgi = row_groups_info[cur_rg_index]; | ||
auto const& row_group = _metadata->get_row_group(rgi.index, rgi.source_index); | ||
|
@@ -1606,14 +1610,22 @@ void reader::impl::compute_input_passes() | |
auto const [compressed_rg_size, _ /*compressed + uncompressed*/] = | ||
get_row_group_size(row_group); | ||
|
||
// We must use the effective size of the first row group we are reading to accurately calculate | ||
// the first non-zero input_pass_start_row_count. | ||
auto const row_group_rows = | ||
(skip_rows) ? rgi.start_row + row_group.num_rows - skip_rows : row_group.num_rows; | ||
|
||
// Set skip_rows = 0 as it is no longer needed for subsequent row_groups | ||
skip_rows = 0; | ||
|
||
// can we add this row group | ||
if (cur_pass_byte_size + compressed_rg_size >= comp_read_limit) { | ||
// A single row group (the current one) is larger than the read limit: | ||
// We always need to include at least one row group, so end the pass at the end of the current | ||
// row group | ||
if (cur_rg_start == cur_rg_index) { | ||
_file_itm_data.input_pass_row_group_offsets.push_back(cur_rg_index + 1); | ||
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group.num_rows); | ||
_file_itm_data.input_pass_start_row_count.push_back(cur_row_count + row_group_rows); | ||
cur_rg_start = cur_rg_index + 1; | ||
cur_pass_byte_size = 0; | ||
} | ||
|
@@ -1627,7 +1639,7 @@ void reader::impl::compute_input_passes() | |
} else { | ||
cur_pass_byte_size += compressed_rg_size; | ||
} | ||
cur_row_count += row_group.num_rows; | ||
cur_row_count += row_group_rows; | ||
} | ||
|
||
// add the last pass if necessary | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -945,7 +945,7 @@ std::vector<std::string> aggregate_reader_metadata::get_pandas_index_names() con | |
return names; | ||
} | ||
|
||
std::tuple<int64_t, size_type, std::vector<row_group_info>> | ||
std::tuple<int64_t, size_type, std::vector<row_group_info>, std::vector<size_t>> | ||
aggregate_reader_metadata::select_row_groups( | ||
host_span<std::vector<size_type> const> row_group_indices, | ||
int64_t skip_rows_opt, | ||
|
@@ -976,6 +976,9 @@ aggregate_reader_metadata::select_row_groups( | |
static_cast<size_type>(from_opts.second)}; | ||
}(); | ||
|
||
// Get number of rows in each data source | ||
std::vector<size_t> num_rows_per_source(per_file_metadata.size(), 0); | ||
|
||
if (!row_group_indices.empty()) { | ||
CUDF_EXPECTS(row_group_indices.size() == per_file_metadata.size(), | ||
"Must specify row groups for each source"); | ||
|
@@ -989,28 +992,46 @@ aggregate_reader_metadata::select_row_groups( | |
selection.emplace_back(rowgroup_idx, rows_to_read, src_idx); | ||
// if page-level indexes are present, then collect extra chunk and page info. | ||
column_info_for_row_group(selection.back(), 0); | ||
rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows; | ||
auto const rows_this_rg = get_row_group(rowgroup_idx, src_idx).num_rows; | ||
rows_to_read += rows_this_rg; | ||
num_rows_per_source[src_idx] += rows_this_rg; | ||
} | ||
} | ||
} else { | ||
size_type count = 0; | ||
for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { | ||
auto const& fmd = per_file_metadata[src_idx]; | ||
for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) { | ||
for (size_t rg_idx = 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The loop will now stop as soon as count >= rows_to_read + rows_to_skip |
||
rg_idx < fmd.row_groups.size() and count < rows_to_skip + rows_to_read; | ||
++rg_idx) { | ||
auto const& rg = fmd.row_groups[rg_idx]; | ||
auto const chunk_start_row = count; | ||
count += rg.num_rows; | ||
if (count > rows_to_skip || count == 0) { | ||
// start row of this row group adjusted with rows_to_skip | ||
num_rows_per_source[src_idx] += count; | ||
mhaseeb123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
num_rows_per_source[src_idx] -= (chunk_start_row <= rows_to_skip and count > rows_to_skip) | ||
mhaseeb123 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
? rows_to_skip | ||
: chunk_start_row; | ||
|
||
// We need the unadjusted start index of this row group to correctly initialize | ||
// ColumnChunkDesc for this row group in create_global_chunk_info() and calculate | ||
// the row offset for the first pass in compute_input_passes(). | ||
selection.emplace_back(rg_idx, chunk_start_row, src_idx); | ||
// if page-level indexes are present, then collect extra chunk and page info. | ||
|
||
// If page-level indexes are present, then collect extra chunk and page info. | ||
// The page indexes rely on absolute row numbers, not adjusted for skip_rows. | ||
column_info_for_row_group(selection.back(), chunk_start_row); | ||
} | ||
if (count >= rows_to_skip + rows_to_read) { break; } | ||
// Adjust the number of rows for the last source file. | ||
if (count >= rows_to_skip + rows_to_read) { | ||
num_rows_per_source[src_idx] -= count - rows_to_skip - rows_to_read; | ||
} | ||
} | ||
} | ||
} | ||
|
||
return {rows_to_skip, rows_to_read, std::move(selection)}; | ||
return {rows_to_skip, rows_to_read, std::move(selection), std::move(num_rows_per_source)}; | ||
} | ||
|
||
std::tuple<std::vector<input_column_info>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Binary search the lower and upper index into the
partial_sum_nrows_source
and compute the number of rows seen per source in between.