Skip to content

Commit

Permalink
Centralize chunked reading code in the parquet reader to reader_impl_…
Browse files Browse the repository at this point in the history
…chunking.cu (#14262)

As a precursor to further chunked reader work, this PR centralizes chunk-related code (mostly from the `reader::impl` class) into `reader_impl_chunking.cu` and `reader_impl_chunking.hpp`.  Also cleans up some variable naming and locations in `reader::impl` and the `file_intermediate_data` and `pass_intermediate_data classes`.

Authors:
  - https://github.com/nvdbaranec

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Robert Maynard (https://github.com/robertmaynard)
  - Nghia Truong (https://github.com/ttnghia)

URL: #14262
  • Loading branch information
nvdbaranec authored Oct 10, 2023
1 parent e345620 commit b4fd77b
Show file tree
Hide file tree
Showing 9 changed files with 751 additions and 654 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ add_library(
src/io/parquet/predicate_pushdown.cpp
src/io/parquet/reader.cpp
src/io/parquet/reader_impl.cpp
src/io/parquet/reader_impl_chunking.cu
src/io/parquet/reader_impl_helpers.cpp
src/io/parquet/reader_impl_preprocess.cu
src/io/parquet/writer_impl.cu
Expand Down
73 changes: 0 additions & 73 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,79 +318,6 @@ struct ColumnChunkDesc {
int32_t src_col_schema{}; // my schema index in the file
};

/**
* @brief The row_group_info class
*/
struct row_group_info {
size_type index; // row group index within a file. aggregate_reader_metadata::get_row_group() is
// called with index and source_index
size_t start_row;
size_type source_index; // file index.

row_group_info() = default;

row_group_info(size_type index, size_t start_row, size_type source_index)
: index{index}, start_row{start_row}, source_index{source_index}
{
}
};

/**
* @brief Struct to store file-level data that remains constant for
* all passes/chunks for the file.
*/
struct file_intermediate_data {
// all row groups to read
std::vector<row_group_info> row_groups{};

// all chunks from the selected row groups. We may end up reading these chunks progressively
// instead of all at once
std::vector<ColumnChunkDesc> chunks{};

// skip_rows/num_rows values for the entire file. these need to be adjusted per-pass because we
// may not be visiting every row group that contains these bounds
size_t global_skip_rows;
size_t global_num_rows;
};

/**
* @brief Structs to identify the reading row range for each chunk of rows in chunked reading.
*/
struct chunk_read_info {
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct to store pass-level data that remains constant for a single pass.
*/
struct pass_intermediate_data {
std::vector<std::unique_ptr<datasource::buffer>> raw_page_data;
rmm::device_buffer decomp_page_data;

// rowgroup, chunk and page information for the current pass.
std::vector<row_group_info> row_groups{};
cudf::detail::hostdevice_vector<ColumnChunkDesc> chunks{};
cudf::detail::hostdevice_vector<PageInfo> pages_info{};
cudf::detail::hostdevice_vector<PageNestingInfo> page_nesting_info{};
cudf::detail::hostdevice_vector<PageNestingDecodeInfo> page_nesting_decode_info{};

rmm::device_uvector<int32_t> page_keys{0, rmm::cuda_stream_default};
rmm::device_uvector<int32_t> page_index{0, rmm::cuda_stream_default};
rmm::device_uvector<string_index_pair> str_dict_index{0, rmm::cuda_stream_default};

std::vector<chunk_read_info> output_chunk_read_info;
std::size_t current_output_chunk{0};

rmm::device_buffer level_decode_data{};
int level_type_size{0};

// skip_rows and num_rows values for this particular pass. these may be adjusted values from the
// global values stored in file_intermediate_data.
size_t skip_rows;
size_t num_rows;
};

/**
* @brief Struct describing an encoder column
*/
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,22 @@ void reader::impl::prepare_data(int64_t skip_rows,
not _input_columns.empty()) {
// fills in chunk information without physically loading or decompressing
// the associated data
load_global_chunk_info();
create_global_chunk_info();

// compute schedule of input reads. Each rowgroup contains 1 chunk per column. For now
// we will read an entire row group at a time. However, it is possible to do
// sub-rowgroup reads if we made some estimates on individual chunk sizes (tricky) and
// changed the high level structure such that we weren't always reading an entire table's
// worth of columns at once.
compute_input_pass_row_group_info();
compute_input_passes();
}

_file_preprocessed = true;
}

// if we have to start a new pass, do that now
if (!_pass_preprocessed) {
auto const num_passes = _input_pass_row_group_offsets.size() - 1;
auto const num_passes = _file_itm_data.input_pass_row_group_offsets.size() - 1;

// always create the pass struct, even if we end up with no passes.
// this will also cause the previous pass information to be deleted
Expand All @@ -373,7 +373,7 @@ void reader::impl::prepare_data(int64_t skip_rows,
if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() &&
not _input_columns.empty() && _current_input_pass < num_passes) {
// setup the pass_intermediate_info for this pass.
setup_pass();
setup_next_pass();

load_and_decompress_data();
preprocess_pages(uses_custom_row_bounds, _output_chunk_read_limit);
Expand Down Expand Up @@ -541,8 +541,8 @@ bool reader::impl::has_next()
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);

auto const num_input_passes =
_input_pass_row_group_offsets.size() == 0 ? 0 : _input_pass_row_group_offsets.size() - 1;
size_t const num_input_passes = std::max(
int64_t{0}, static_cast<int64_t>(_file_itm_data.input_pass_row_group_offsets.size()) - 1);
return (_pass_itm_data->current_output_chunk < _pass_itm_data->output_chunk_read_info.size()) ||
(_current_input_pass < num_input_passes);
}
Expand Down
49 changes: 34 additions & 15 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include "parquet_gpu.hpp"
#include "reader_impl_chunking.hpp"
#include "reader_impl_helpers.hpp"

#include <cudf/io/datasource.hpp>
Expand Down Expand Up @@ -136,10 +137,6 @@ class reader::impl {
host_span<std::vector<size_type> const> row_group_indices,
std::optional<std::reference_wrapper<ast::expression const>> filter);

void load_global_chunk_info();
void compute_input_pass_row_group_info();
void setup_pass();

/**
* @brief Create chunk information and start file reads
*
Expand Down Expand Up @@ -250,6 +247,31 @@ class reader::impl {
*/
void decode_page_data(size_t skip_rows, size_t num_rows);

/**
* @brief Creates file-wide parquet chunk information.
*
* Creates information about all chunks in the file, storing it in
* the file-wide _file_itm_data structure.
*/
void create_global_chunk_info();

/**
* @brief Computes all of the passes we will perform over the file.
*/
void compute_input_passes();

/**
* @brief Close out the existing pass (if any) and prepare for the next pass.
*/
void setup_next_pass();

/**
* @brief Given a set of pages that have had their sizes computed by nesting level and
* a limit on total read size, generate a set of {skip_rows, num_rows} pairs representing
* a set of reads that will generate output columns of total size <= `chunk_read_limit` bytes.
*/
void compute_splits_for_pass();

private:
rmm::cuda_stream_view _stream;
rmm::mr::device_memory_resource* _mr = nullptr;
Expand Down Expand Up @@ -278,27 +300,24 @@ class reader::impl {

// chunked reading happens in 2 parts:
//
// At the top level there is the "pass" in which we try and limit the
// At the top level, the entire file is divided up into "passes" omn which we try and limit the
// total amount of temporary memory (compressed data, decompressed data) in use
// via _input_pass_read_limit.
//
// Within a pass, we produce one or more chunks of output, whose maximum total
// byte size is controlled by _output_chunk_read_limit.

file_intermediate_data _file_itm_data;
bool _file_preprocessed{false};

std::unique_ptr<pass_intermediate_data> _pass_itm_data;
bool _pass_preprocessed{false};

// an array of offsets into _file_itm_data::global_chunks. Each pair of offsets represents
// the start/end of the chunks to be loaded for a given pass.
std::vector<std::size_t> _input_pass_row_group_offsets{};
std::vector<std::size_t> _input_pass_row_count{};
std::size_t _current_input_pass{0};
std::size_t _chunk_count{0};
std::size_t _output_chunk_read_limit{0}; // output chunk size limit in bytes
std::size_t _input_pass_read_limit{0}; // input pass memory usage limit in bytes

std::size_t _output_chunk_read_limit{0};
std::size_t _input_pass_read_limit{0};
bool _pass_preprocessed{false};
bool _file_preprocessed{false};
std::size_t _current_input_pass{0}; // current input pass index
std::size_t _chunk_count{0}; // how many output chunks we have produced
};

} // namespace cudf::io::parquet::detail
Loading

0 comments on commit b4fd77b

Please sign in to comment.