Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize chunked reading code in the parquet reader to reader_impl_chunking.cu #14262

Merged
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ add_library(
src/io/parquet/predicate_pushdown.cpp
src/io/parquet/reader.cpp
src/io/parquet/reader_impl.cpp
src/io/parquet/reader_impl_chunking.cu
src/io/parquet/reader_impl_helpers.cpp
src/io/parquet/reader_impl_preprocess.cu
src/io/parquet/writer_impl.cu
Expand Down
73 changes: 0 additions & 73 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,79 +318,6 @@ struct ColumnChunkDesc {
int32_t src_col_schema{}; // my schema index in the file
};

/**
* @brief The row_group_info class
*/
struct row_group_info {
size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is
// called with index and source_index
size_t start_row;
size_type source_index; // file index.

row_group_info() = default;

row_group_info(size_type index, size_t start_row, size_type source_index)
: index{index}, start_row{start_row}, source_index{source_index}
{
}
};

/**
* @brief Struct to store file-level data that remains constant for
* all passes/chunks for the file.
*/
struct file_intermediate_data {
// all row groups to read
std::vector<row_group_info> row_groups{};

// all chunks from the selected row groups. We may end up reading these chunks progressively
// instead of all at once
std::vector<ColumnChunkDesc> chunks{};

// skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we
// may not be visiting every row group that contains these bounds
size_t global_skip_rows;
size_t global_num_rows;
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct to store pass-level data that remains constant for a single pass.
*/
struct pass_intermediate_data {
std::vector<std::unique_ptr<datasource::buffer>> raw_page_data;
rmm::device_buffer decomp_page_data;

// rowgroup, chunk and page information for the current pass.
std::vector<row_group_info> row_groups{};
cudf::detail::hostdevice_vector<ColumnChunkDesc> chunks{};
cudf::detail::hostdevice_vector<PageInfo> pages_info{};
cudf::detail::hostdevice_vector<PageNestingInfo> page_nesting_info{};
cudf::detail::hostdevice_vector<PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_uvector<int32_t> page_keys{0, rmm::cuda_stream_default};
rmm::device_uvector<int32_t> page_index{0, rmm::cuda_stream_default};
rmm::device_uvector<string_index_pair> str_dict_index{0, rmm::cuda_stream_default};

std::vector<chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data{};
int level_type_size{0};

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct describing an encoder column
*/
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,22 @@ void reader::impl::prepare_data(int64_t skip_rows,
not _input_columns.empty()) {
// fills in chunk information without physically loading or decompressing
// the associated data
load_global_chunk_info();
create_global_chunk_info();

// compute schedule of input reads. Each rowgroup contains 1 chunk per column. For now
// we will read an entire row group at a time. However, it is possible to do
// sub-rowgroup reads if we made some estimates on individual chunk sizes (tricky) and
// changed the high level structure such that we weren't always reading an entire table's
// worth of columns at once.
compute_input_pass_row_group_info();
compute_input_passes();
}

_file_preprocessed = true;
}

// if we have to start a new pass, do that now
if (!_pass_preprocessed) {
auto const num_passes = _input_pass_row_group_offsets.size() - 1;
auto const num_passes = _file_itm_data.input_pass_row_group_offsets.size() - 1;

// always create the pass struct, even if we end up with no passes.
// this will also cause the previous pass information to be deleted
Expand All @@ -373,7 +373,7 @@ void reader::impl::prepare_data(int64_t skip_rows,
if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() &&
not _input_columns.empty() && _current_input_pass < num_passes) {
// setup the pass_intermediate_info for this pass.
setup_pass();
setup_next_pass();

load_and_decompress_data();
preprocess_pages(uses_custom_row_bounds, _output_chunk_read_limit);
Expand Down Expand Up @@ -541,8 +541,8 @@ bool reader::impl::has_next()
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);

auto const num_input_passes =
_input_pass_row_group_offsets.size() == 0 ? 0 : _input_pass_row_group_offsets.size() - 1;
size_t const num_input_passes = std::max(
int64_t{0}, static_cast<int64_t>(_file_itm_data.input_pass_row_group_offsets.size()) - 1);
return (_pass_itm_data->current_output_chunk < _pass_itm_data->output_chunk_read_info.size()) ||
(_current_input_pass < num_input_passes);
}
Expand Down
49 changes: 34 additions & 15 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include "parquet_gpu.hpp"
#include "reader_impl_chunking.hpp"
#include "reader_impl_helpers.hpp"

#include <cudf/io/datasource.hpp>
Expand Down Expand Up @@ -136,10 +137,6 @@ class reader::impl {
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter);

void load_global_chunk_info();
void compute_input_pass_row_group_info();
void setup_pass();

/**
* @brief Create chunk information and start file reads
*
Expand Down Expand Up @@ -250,6 +247,31 @@ class reader::impl {
*/
void decode_page_data(size_t skip_rows, size_t num_rows);

/**
* @brief Creates file-wide parquet chunk information.
*
* Creates information about all chunks in the file, storing it in
* the file-wide _file_itm_data structure.
*/
void create_global_chunk_info();

/**
* @brief Computes all of the passes we will perform over the file.
*/
void compute_input_passes();

/**
* @brief Close out the existing pass (if any) and prepare for the next pass.
*/
void setup_next_pass();

/**
* @brief Given a set of pages that have had their sizes computed by nesting level and
* a limit on total read size, generate a set of {skip_rows, num_rows} pairs representing
* a set of reads that will generate output columns of total size <= `chunk_read_limit` bytes.
*/
void compute_splits_for_pass();

private:
rmm::cuda_stream_view _stream;
rmm::mr::device_memory_resource* _mr = nullptr;
Expand Down Expand Up @@ -278,27 +300,24 @@ class reader::impl {

// chunked reading happens in 2 parts:
//
// At the top level there is the "pass" in which we try and limit the
// At the top level, the entire file is divided up into "passes" omn which we try and limit the
// total amount of temporary memory (compressed data, decompressed data) in use
// via _input_pass_read_limit.
//
// Within a pass, we produce one or more chunks of output, whose maximum total
// byte size is controlled by _output_chunk_read_limit.

file_intermediate_data _file_itm_data;
bool _file_preprocessed{false};

std::unique_ptr<pass_intermediate_data> _pass_itm_data;
bool _pass_preprocessed{false};

// an array of offsets into _file_itm_data::global_chunks. Each pair of offsets represents
// the start/end of the chunks to be loaded for a given pass.
std::vector<std::size_t> _input_pass_row_group_offsets{};
std::vector<std::size_t> _input_pass_row_count{};
std::size_t _current_input_pass{0};
std::size_t _chunk_count{0};
std::size_t _output_chunk_read_limit{0}; // output chunk size limit in bytes
std::size_t _input_pass_read_limit{0}; // input pass memory usage limit in bytes

std::size_t _output_chunk_read_limit{0};
std::size_t _input_pass_read_limit{0};
bool _pass_preprocessed{false};
bool _file_preprocessed{false};
std::size_t _current_input_pass{0}; // current input pass index
std::size_t _chunk_count{0}; // how many output chunks we have produced
};

} // namespace cudf::io::parquet::detail
Loading