Skip to content

Commit

Permalink
Use page statistics in Parquet reader (#14973)
Browse files Browse the repository at this point in the history
#14000 added the ability to write new page statistics to the Parquet writer. This PR uses these new statistics to avoid some string size computations. Benchmarks show an improvement in read times of up to 20%.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #14973
  • Loading branch information
etseidl authored Mar 7, 2024
1 parent 352d686 commit efae666
Show file tree
Hide file tree
Showing 11 changed files with 550 additions and 59 deletions.
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,10 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
if (!t) {
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
s->page.str_bytes = 0;
s->input_row_count = 0;
s->input_value_count = 0;
// str_bytes_from_index will be 0 if no page stats are present
s->page.str_bytes = s->page.str_bytes_from_index;
s->input_row_count = 0;
s->input_value_count = 0;

// in the base pass, we're computing the number of rows, make sure we visit absolutely
// everything
Expand Down Expand Up @@ -462,7 +463,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
}

// retrieve total string size.
if (compute_string_sizes) {
if (compute_string_sizes && !pp->has_page_index) {
auto const str_bytes = gpuDecodeTotalPageStringSize(s, t);
if (t == 0) { s->page.str_bytes = str_bytes; }
}
Expand Down
21 changes: 13 additions & 8 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,19 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
bs->page.temp_string_size = 0;
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
bs->page.str_bytes_from_index = 0;
bs->page.num_valids = 0;
bs->page.start_val = 0;
bs->page.end_val = 0;
bs->page.has_page_index = false;
bs->page.temp_string_size = 0;
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
}
num_values = bs->ck.num_values;
page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr;
Expand Down
34 changes: 31 additions & 3 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,12 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo
PageInfo* const pp = &pages[page_idx];

if (t == 0) {
s->page.num_nulls = 0;
s->page.num_valids = 0;
// don't clobber these if they're already computed from the index
if (!pp->has_page_index) {
s->page.num_nulls = 0;
s->page.num_valids = 0;
}
// reset str_bytes to 0 in case it's already been calculated (esp needed for chunked reads).
// TODO: need to rethink this once str_bytes is in the statistics
pp->str_bytes = 0;
}

Expand Down Expand Up @@ -632,6 +634,9 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) { return; }

// find start/end value indices
auto const [start_value, end_value] =
page_bounds(s, min_row, num_rows, is_bounds_pg, has_repetition, decoders);
Expand Down Expand Up @@ -698,6 +703,15 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPage
}
}
} else {
bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
uint8_t const* data = s->data_start;
Expand Down Expand Up @@ -759,6 +773,13 @@ CUDF_KERNEL void __launch_bounds__(delta_length_block_size) gpuComputeDeltaLengt

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

// for DELTA_LENGTH_BYTE_ARRAY, string size is page_data_size - size_of_delta_binary_block.
// so all we need to do is skip the encoded string size info and then do pointer arithmetic,
// if this isn't a bounds page.
Expand Down Expand Up @@ -850,6 +871,13 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ struct PageInfo {
// for string columns only, the size of all the chars in the string for
// this page. only valid/computed during the base preprocess pass
int32_t str_bytes;
int32_t str_offset; // offset into string data for this page
int32_t str_offset; // offset into string data for this page
bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes

// nesting information (input/output) for each page. this array contains
// input column nesting information, output column nesting information and
Expand All @@ -335,8 +336,15 @@ struct PageInfo {
uint8_t* temp_string_buf;

decode_kernel_mask kernel_mask;

// str_bytes from page index. because str_bytes needs to be reset each iteration
// while doing chunked reads, persist the value from the page index here.
int32_t str_bytes_from_index;
};

// forward declaration
struct column_chunk_info;

/**
* @brief Return the column schema id as the key for a PageInfo struct.
*/
Expand Down Expand Up @@ -376,6 +384,7 @@ struct ColumnChunkDesc {
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_,
column_chunk_info const* chunk_info_,
float list_bytes_per_row_est_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
Expand All @@ -400,6 +409,7 @@ struct ColumnChunkDesc {
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_)
{
}
Expand Down Expand Up @@ -430,6 +440,9 @@ struct ColumnChunkDesc {
int32_t src_col_index{}; // my input column index
int32_t src_col_schema{}; // my schema index in the file

// pointer to column_chunk_info struct for this chunk (host only)
column_chunk_info const* h_chunk_info{};

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row
};

Expand Down
29 changes: 19 additions & 10 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace cudf::io::parquet::detail {

void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -62,14 +62,23 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
auto const has_strings = (kernel_mask & STRINGS_MASK) != 0;
std::vector<size_t> col_string_sizes(_input_columns.size(), 0L);
if (has_strings) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
skip_rows,
num_rows,
level_type_size,
kernel_mask,
_stream);
// need to compute pages bounds/sizes if we lack page indexes or are using custom bounds
// TODO: we could probably dummy up size stats for FLBA data since we know the width
auto const has_flba =
std::any_of(pass.chunks.begin(), pass.chunks.end(), [](auto const& chunk) {
return (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY && chunk.converted_type != DECIMAL;
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
skip_rows,
num_rows,
level_type_size,
kernel_mask,
_stream);
}

col_string_sizes = calculate_page_string_offsets();

Expand Down Expand Up @@ -426,7 +435,7 @@ table_with_metadata reader::impl::read_chunk_internal(
allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds);

// Parse data into the output buffers.
decode_page_data(read_info.skip_rows, read_info.num_rows);
decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows);

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,12 @@ class reader::impl {
/**
* @brief Converts the page data and outputs to columns.
*
* @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific
* bounds
* @param skip_rows Minimum number of rows from start
* @param num_rows Number of rows to output
*/
void decode_page_data(size_t skip_rows, size_t num_rows);
void decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows);

/**
* @brief Creates file-wide parquet chunk information.
Expand Down Expand Up @@ -365,6 +367,10 @@ class reader::impl {
std::unique_ptr<table_metadata> _output_metadata;

bool _strings_to_categorical = false;

// are there usable page indexes available
bool _has_page_index = false;

std::optional<std::vector<reader_column_schema>> _reader_column_schema;
data_type _timestamp_type{type_id::EMPTY};

Expand Down
27 changes: 27 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,28 @@ void reader::impl::create_global_chunk_info()
auto const num_input_columns = _input_columns.size();
auto const num_chunks = row_groups_info.size() * num_input_columns;

// Mapping of input column to page index column
std::vector<size_type> column_mapping;

if (_has_page_index and not row_groups_info.empty()) {
// use first row group to define mappings (assumes same schema for each file)
auto const& rg = row_groups_info[0];
auto const& columns = _metadata->get_row_group(rg.index, rg.source_index).columns;
column_mapping.resize(num_input_columns);
std::transform(
_input_columns.begin(), _input_columns.end(), column_mapping.begin(), [&](auto const& col) {
// translate schema_idx into something we can use for the page indexes
if (auto it = std::find_if(
columns.begin(),
columns.end(),
[&col](auto const& col_chunk) { return col_chunk.schema_idx == col.schema_idx; });
it != columns.end()) {
return std::distance(columns.begin(), it);
}
CUDF_FAIL("cannot find column mapping");
});
}

// Initialize column chunk information
auto remaining_rows = num_rows;
for (auto const& rg : row_groups_info) {
Expand Down Expand Up @@ -1505,6 +1527,10 @@ void reader::impl::create_global_chunk_info()
static_cast<float>(row_group.num_rows)
: 0.0f;

// grab the column_chunk_info for each chunk (if it exists)
column_chunk_info const* const chunk_info =
_has_page_index ? &rg.column_chunks.value()[column_mapping[i]] : nullptr;

chunks.push_back(ColumnChunkDesc(col_meta.total_compressed_size,
nullptr,
col_meta.num_values,
Expand All @@ -1524,6 +1550,7 @@ void reader::impl::create_global_chunk_info()
clock_rate,
i,
col.schema_idx,
chunk_info,
list_bytes_per_row_est));
}

Expand Down
Loading

0 comments on commit efae666

Please sign in to comment.