Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffling read into a sub function in parquet read #12809

Merged
15 changes: 13 additions & 2 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -130,10 +130,21 @@ class reader::impl {
bool uses_custom_row_bounds,
host_span<std::vector<size_type> const> row_group_indices);

/**
* @brief Create chunk information and start file reads
*
* @param row_groups_info vector of information about row groups to read
* @param num_rows Maximum number of rows to read
* @return pair of boolean indicating if compressed chunks were found and a vector of futures for
* read completion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* read completion
* read completion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to push it out as well, but pre-commit did that format and changes it back to that.

*/
std::pair<bool, std::vector<std::future<void>>> create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows);

/**
* @brief Load and decompress the input file(s) into memory.
*/
void load_and_decompress_data(std::vector<row_group_info> const& row_groups_info,
void load_and_decompress_data(cudf::host_span<row_group_info const> const row_groups_info,
size_type num_rows);

/**
Expand Down
43 changes: 26 additions & 17 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -651,16 +651,11 @@ void reader::impl::allocate_nesting_info()
page_nesting_decode_info.host_to_device(_stream);
}

void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& row_groups_info,
size_type num_rows)
std::pair<bool, std::vector<std::future<void>>> reader::impl::create_and_read_column_chunks(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
// This function should never be called if `num_rows == 0`.
CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero.");
hyperbolic2346 marked this conversation as resolved.
Show resolved Hide resolved

auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;
auto& raw_page_data = _file_itm_data.raw_page_data;
auto& chunks = _file_itm_data.chunks;

// Descriptors for all the chunks that make up the selected columns
const auto num_input_columns = _input_columns.size();
Expand Down Expand Up @@ -732,7 +727,7 @@ void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& r
total_decompressed_size += col_meta.total_uncompressed_size;
}
}
remaining_rows -= row_group.num_rows;
remaining_rows -= row_group_rows;
}

// Read compressed chunk data to device memory
Expand All @@ -745,27 +740,41 @@ void reader::impl::load_and_decompress_data(std::vector<row_group_info> const& r
chunk_source_map,
_stream));

CUDF_EXPECTS(remaining_rows == 0, "All rows data must be read.");

return {total_decompressed_size > 0, std::move(read_rowgroup_tasks)};
}

void reader::impl::load_and_decompress_data(
cudf::host_span<row_group_info const> const row_groups_info, size_type num_rows)
{
// This function should never be called if `num_rows == 0`.
CUDF_EXPECTS(num_rows > 0, "Number of reading rows must not be zero.");

auto& raw_page_data = _file_itm_data.raw_page_data;
auto& decomp_page_data = _file_itm_data.decomp_page_data;
auto& chunks = _file_itm_data.chunks;
auto& pages_info = _file_itm_data.pages_info;

auto const [has_compressed_data, read_rowgroup_tasks] =
create_and_read_column_chunks(row_groups_info, num_rows);

for (auto& task : read_rowgroup_tasks) {
task.wait();
}

CUDF_EXPECTS(remaining_rows <= 0, "All rows data must be read.");

// Process dataset chunk pages into output columns
auto const total_pages = count_page_headers(chunks, _stream);
pages_info = hostdevice_vector<gpu::PageInfo>(total_pages, total_pages, _stream);

if (total_pages > 0) {
// decoding of column/page information
decode_page_headers(chunks, pages_info, _stream);
if (total_decompressed_size > 0) {
if (has_compressed_data) {
decomp_page_data = decompress_page_data(chunks, pages_info, _stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) {
raw_page_data[c].reset();
// TODO: Check if this is called
}
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { raw_page_data[c].reset(); }
}
}

Expand Down