diff --git a/cpp/src/io/parquet/decode_preprocess.cu b/cpp/src/io/parquet/decode_preprocess.cu index 19c398c5965..8d8bed8f8bf 100644 --- a/cpp/src/io/parquet/decode_preprocess.cu +++ b/cpp/src/io/parquet/decode_preprocess.cu @@ -375,9 +375,10 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) if (!t) { s->page.skipped_values = -1; s->page.skipped_leaf_values = 0; - s->page.str_bytes = 0; - s->input_row_count = 0; - s->input_value_count = 0; + // str_bytes_from_index will be 0 if no page stats are present + s->page.str_bytes = s->page.str_bytes_from_index; + s->input_row_count = 0; + s->input_value_count = 0; // in the base pass, we're computing the number of rows, make sure we visit absolutely // everything @@ -462,7 +463,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) } // retrieve total string size. - if (compute_string_sizes) { + if (compute_string_sizes && !pp->has_page_index) { auto const str_bytes = gpuDecodeTotalPageStringSize(s, t); if (t == 0) { s->page.str_bytes = str_bytes; } } diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 0dae0724823..f502fc837d6 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -385,14 +385,19 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks, // this computation is only valid for flat schemas. for nested schemas, // they will be recomputed in the preprocess step by examining repetition and // definition levels - bs->page.chunk_row = 0; - bs->page.num_rows = 0; - bs->page.skipped_values = -1; - bs->page.skipped_leaf_values = 0; - bs->page.str_bytes = 0; - bs->page.temp_string_size = 0; - bs->page.temp_string_buf = nullptr; - bs->page.kernel_mask = decode_kernel_mask::NONE; + bs->page.chunk_row = 0; + bs->page.num_rows = 0; + bs->page.skipped_values = -1; + bs->page.skipped_leaf_values = 0; + bs->page.str_bytes = 0; + bs->page.str_bytes_from_index = 0; + bs->page.num_valids = 0; + bs->page.start_val = 0; + bs->page.end_val = 0; + bs->page.has_page_index = false; + bs->page.temp_string_size = 0; + bs->page.temp_string_buf = nullptr; + bs->page.kernel_mask = decode_kernel_mask::NONE; } num_values = bs->ck.num_values; page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index b63f96fda46..a0dfaa2fa58 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -599,10 +599,12 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo PageInfo* const pp = &pages[page_idx]; if (t == 0) { - s->page.num_nulls = 0; - s->page.num_valids = 0; + // don't clobber these if they're already computed from the index + if (!pp->has_page_index) { + s->page.num_nulls = 0; + s->page.num_valids = 0; + } // reset str_bytes to 0 in case it's already been calculated (esp needed for chunked reads). - // TODO: need to rethink this once str_bytes is in the statistics pp->str_bytes = 0; } @@ -632,6 +634,9 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); + // if we have size info, then we only need to do this for bounds pages + if (pp->has_page_index && !is_bounds_pg) { return; } + // find start/end value indices auto const [start_value, end_value] = page_bounds(s, min_row, num_rows, is_bounds_pg, has_repetition, decoders); @@ -698,6 +703,15 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPage } } } else { + bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); + + // if we have size info, then we only need to do this for bounds pages + if (pp->has_page_index && !is_bounds_pg) { + // check if we need to store values from the index + if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; } + return; + } + // now process string info in the range [start_value, end_value) // set up for decoding strings...can be either plain or dictionary uint8_t const* data = s->data_start; @@ -759,6 +773,13 @@ CUDF_KERNEL void __launch_bounds__(delta_length_block_size) gpuComputeDeltaLengt bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); + // if we have size info, then we only need to do this for bounds pages + if (pp->has_page_index && !is_bounds_pg) { + // check if we need to store values from the index + if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; } + return; + } + // for DELTA_LENGTH_BYTE_ARRAY, string size is page_data_size - size_of_delta_binary_block. // so all we need to do is skip the encoded string size info and then do pointer arithmetic, // if this isn't a bounds page. @@ -850,6 +871,13 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); + // if we have size info, then we only need to do this for bounds pages + if (pp->has_page_index && !is_bounds_pg) { + // check if we need to store values from the index + if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; } + return; + } + auto const& col = s->col; size_t str_bytes = 0; // short circuit for FIXED_LEN_BYTE_ARRAY diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index af9f1f1267e..c66f69b3567 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -316,7 +316,8 @@ struct PageInfo { // for string columns only, the size of all the chars in the string for // this page. only valid/computed during the base preprocess pass int32_t str_bytes; - int32_t str_offset; // offset into string data for this page + int32_t str_offset; // offset into string data for this page + bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes // nesting information (input/output) for each page. this array contains // input column nesting information, output column nesting information and @@ -335,8 +336,15 @@ struct PageInfo { uint8_t* temp_string_buf; decode_kernel_mask kernel_mask; + + // str_bytes from page index. because str_bytes needs to be reset each iteration + // while doing chunked reads, persist the value from the page index here. + int32_t str_bytes_from_index; }; +// forward declaration +struct column_chunk_info; + /** * @brief Return the column schema id as the key for a PageInfo struct. */ @@ -376,6 +384,7 @@ struct ColumnChunkDesc { int32_t ts_clock_rate_, int32_t src_col_index_, int32_t src_col_schema_, + column_chunk_info const* chunk_info_, float list_bytes_per_row_est_) : compressed_data(compressed_data_), compressed_size(compressed_size_), @@ -400,6 +409,7 @@ struct ColumnChunkDesc { ts_clock_rate(ts_clock_rate_), src_col_index(src_col_index_), src_col_schema(src_col_schema_), + h_chunk_info(chunk_info_), list_bytes_per_row_est(list_bytes_per_row_est_) { } @@ -430,6 +440,9 @@ struct ColumnChunkDesc { int32_t src_col_index{}; // my input column index int32_t src_col_schema{}; // my schema index in the file + // pointer to column_chunk_info struct for this chunk (host only) + column_chunk_info const* h_chunk_info{}; + float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row }; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 207f908febf..89562514564 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -28,7 +28,7 @@ namespace cudf::io::parquet::detail { -void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) +void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows) { auto& pass = *_pass_itm_data; auto& subpass = *pass.subpass; @@ -62,14 +62,23 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) auto const has_strings = (kernel_mask & STRINGS_MASK) != 0; std::vector col_string_sizes(_input_columns.size(), 0L); if (has_strings) { - ComputePageStringSizes(subpass.pages, - pass.chunks, - delta_temp_buf, - skip_rows, - num_rows, - level_type_size, - kernel_mask, - _stream); + // need to compute pages bounds/sizes if we lack page indexes or are using custom bounds + // TODO: we could probably dummy up size stats for FLBA data since we know the width + auto const has_flba = + std::any_of(pass.chunks.begin(), pass.chunks.end(), [](auto const& chunk) { + return (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY && chunk.converted_type != DECIMAL; + }); + + if (!_has_page_index || uses_custom_row_bounds || has_flba) { + ComputePageStringSizes(subpass.pages, + pass.chunks, + delta_temp_buf, + skip_rows, + num_rows, + level_type_size, + kernel_mask, + _stream); + } col_string_sizes = calculate_page_string_offsets(); @@ -426,7 +435,7 @@ table_with_metadata reader::impl::read_chunk_internal( allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); // Parse data into the output buffers. - decode_page_data(read_info.skip_rows, read_info.num_rows); + decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows); // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 67c56c9c2d7..185419a5b46 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -311,10 +311,12 @@ class reader::impl { /** * @brief Converts the page data and outputs to columns. * + * @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific + * bounds * @param skip_rows Minimum number of rows from start * @param num_rows Number of rows to output */ - void decode_page_data(size_t skip_rows, size_t num_rows); + void decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows); /** * @brief Creates file-wide parquet chunk information. @@ -365,6 +367,10 @@ class reader::impl { std::unique_ptr _output_metadata; bool _strings_to_categorical = false; + + // are there usable page indexes available + bool _has_page_index = false; + std::optional> _reader_column_schema; data_type _timestamp_type{type_id::EMPTY}; diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index b05318d3a91..9c14902ef2f 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -1476,6 +1476,28 @@ void reader::impl::create_global_chunk_info() auto const num_input_columns = _input_columns.size(); auto const num_chunks = row_groups_info.size() * num_input_columns; + // Mapping of input column to page index column + std::vector column_mapping; + + if (_has_page_index and not row_groups_info.empty()) { + // use first row group to define mappings (assumes same schema for each file) + auto const& rg = row_groups_info[0]; + auto const& columns = _metadata->get_row_group(rg.index, rg.source_index).columns; + column_mapping.resize(num_input_columns); + std::transform( + _input_columns.begin(), _input_columns.end(), column_mapping.begin(), [&](auto const& col) { + // translate schema_idx into something we can use for the page indexes + if (auto it = std::find_if( + columns.begin(), + columns.end(), + [&col](auto const& col_chunk) { return col_chunk.schema_idx == col.schema_idx; }); + it != columns.end()) { + return std::distance(columns.begin(), it); + } + CUDF_FAIL("cannot find column mapping"); + }); + } + // Initialize column chunk information auto remaining_rows = num_rows; for (auto const& rg : row_groups_info) { @@ -1505,6 +1527,10 @@ void reader::impl::create_global_chunk_info() static_cast(row_group.num_rows) : 0.0f; + // grab the column_chunk_info for each chunk (if it exists) + column_chunk_info const* const chunk_info = + _has_page_index ? &rg.column_chunks.value()[column_mapping[i]] : nullptr; + chunks.push_back(ColumnChunkDesc(col_meta.total_compressed_size, nullptr, col_meta.num_values, @@ -1524,6 +1550,7 @@ void reader::impl::create_global_chunk_info() clock_rate, i, col.schema_idx, + chunk_info, list_bytes_per_row_est)); } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 6f11debb8df..776caa99ac9 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -267,24 +267,45 @@ metadata::metadata(datasource* source) cp.read(this); CUDF_EXPECTS(cp.InitSchema(this), "Cannot initialize schema"); - // loop through the column chunks and read column and offset indexes - for (auto& rg : row_groups) { - for (auto& col : rg.columns) { - if (col.column_index_length > 0 && col.column_index_offset > 0) { - auto const col_idx_buf = - source->host_read(col.column_index_offset, col.column_index_length); - cp.init(col_idx_buf->data(), col_idx_buf->size()); - ColumnIndex ci; - cp.read(&ci); - col.column_index = std::move(ci); - } - if (col.offset_index_length > 0 && col.offset_index_offset > 0) { - auto const off_idx_buf = - source->host_read(col.offset_index_offset, col.offset_index_length); - cp.init(off_idx_buf->data(), off_idx_buf->size()); - OffsetIndex oi; - cp.read(&oi); - col.offset_index = std::move(oi); + // Reading the page indexes is somewhat expensive, so skip if there are no byte array columns. + // Currently the indexes are only used for the string size calculations. + // Could also just read indexes for string columns, but that would require changes elsewhere + // where we're trying to determine if we have the indexes or not. + // Note: This will have to be modified if there are other uses in the future (e.g. calculating + // chunk/pass boundaries). + auto const has_strings = std::any_of( + schema.begin(), schema.end(), [](auto const& elem) { return elem.type == BYTE_ARRAY; }); + + if (has_strings and not row_groups.empty() and not row_groups.front().columns.empty()) { + // column index and offset index are encoded back to back. + // the first column of the first row group will have the first column index, the last + // column of the last row group will have the final offset index. + int64_t const min_offset = row_groups.front().columns.front().column_index_offset; + auto const& last_col = row_groups.back().columns.back(); + int64_t const max_offset = last_col.offset_index_offset + last_col.offset_index_length; + + if (max_offset > 0) { + int64_t const length = max_offset - min_offset; + auto const idx_buf = source->host_read(min_offset, length); + + // now loop over row groups + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + int64_t const offset = col.column_index_offset - min_offset; + cp.init(idx_buf->data() + offset, col.column_index_length); + ColumnIndex ci; + cp.read(&ci); + col.column_index = std::move(ci); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + int64_t const offset = col.offset_index_offset - min_offset; + cp.init(idx_buf->data() + offset, col.offset_index_length); + OffsetIndex oi; + cp.read(&oi); + col.offset_index = std::move(oi); + } + } } } } @@ -346,6 +367,142 @@ size_type aggregate_reader_metadata::calc_num_row_groups() const }); } +// Copies info from the column and offset indexes into the passed in row_group_info. +void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_info, + size_type chunk_start_row) const +{ + auto const& fmd = per_file_metadata[rg_info.source_index]; + auto const& rg = fmd.row_groups[rg_info.index]; + + std::vector chunks(rg.columns.size()); + + for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++) { + auto const& col_chunk = rg.columns[col_idx]; + auto& schema = get_schema(col_chunk.schema_idx); + auto const max_def_level = schema.max_definition_level; + auto const max_rep_level = schema.max_repetition_level; + + // If any columns lack the page indexes then just return without modifying the + // row_group_info. + if (not col_chunk.offset_index.has_value() or not col_chunk.column_index.has_value()) { + return; + } + + auto const& offset_index = col_chunk.offset_index.value(); + auto const& column_index = col_chunk.column_index.value(); + + auto& chunk_info = chunks[col_idx]; + auto const num_pages = offset_index.page_locations.size(); + + // There is a bug in older versions of parquet-mr where the first data page offset + // really points to the dictionary page. The first possible offset in a file is 4 (after + // the "PAR1" header), so check to see if the dictionary_page_offset is > 0. If it is, then + // we haven't encountered the bug. + if (col_chunk.meta_data.dictionary_page_offset > 0) { + chunk_info.dictionary_offset = col_chunk.meta_data.dictionary_page_offset; + chunk_info.dictionary_size = + col_chunk.meta_data.data_page_offset - chunk_info.dictionary_offset.value(); + } else { + // dictionary_page_offset is 0, so check to see if the data_page_offset does not match + // the first offset in the offset index. If they don't match, then data_page_offset points + // to the dictionary page. + if (num_pages > 0 && + col_chunk.meta_data.data_page_offset < offset_index.page_locations[0].offset) { + chunk_info.dictionary_offset = col_chunk.meta_data.data_page_offset; + chunk_info.dictionary_size = + offset_index.page_locations[0].offset - col_chunk.meta_data.data_page_offset; + } + } + + // Use the definition_level_histogram to get num_valid and num_null. For now, these are + // only ever used for byte array columns. The repetition_level_histogram might be + // necessary to determine the total number of values in the page if the + // definition_level_histogram is absent. + // + // In the future we might want the full histograms saved in the `column_info` struct. + int64_t const* const def_hist = column_index.definition_level_histogram.has_value() + ? column_index.definition_level_histogram.value().data() + : nullptr; + int64_t const* const rep_hist = column_index.repetition_level_histogram.has_value() + ? column_index.repetition_level_histogram.value().data() + : nullptr; + + for (size_t pg_idx = 0; pg_idx < num_pages; pg_idx++) { + auto const& page_loc = offset_index.page_locations[pg_idx]; + // translate chunk-relative row nums to absolute within the file + auto const pg_start_row = chunk_start_row + page_loc.first_row_index; + auto const pg_end_row = + chunk_start_row + (pg_idx == (num_pages - 1) + ? rg.num_rows + : offset_index.page_locations[pg_idx + 1].first_row_index); + + auto const num_rows = pg_end_row - pg_start_row; + page_info pg_info{page_loc, num_rows}; + + // check to see if we already have null counts for each page + if (column_index.null_counts.has_value()) { + pg_info.num_nulls = column_index.null_counts.value()[pg_idx]; + } + + // save variable length byte info if present + if (offset_index.unencoded_byte_array_data_bytes.has_value()) { + pg_info.var_bytes_size = offset_index.unencoded_byte_array_data_bytes.value()[pg_idx]; + } + + // if def histogram is present, then use it to calculate num_valid and num_nulls + if (def_hist != nullptr) { + auto const h = &def_hist[pg_idx * (max_def_level + 1)]; + pg_info.num_valid = h[max_def_level]; + + // calculate num_nulls if not available from column index + if (not pg_info.num_nulls.has_value()) { + pg_info.num_nulls = std::reduce(h, h + max_def_level); + } + } + // there is no def histogram. + // if there is no repetition (no lists), then num_values == num_rows, and num_nulls can be + // obtained from the column index + else if (max_rep_level == 0) { + // if we already have num_nulls from column index + if (pg_info.num_nulls.has_value()) { + pg_info.num_valid = pg_info.num_rows - pg_info.num_nulls.value(); + } + // if max_def is 0, there are no nulls + else if (max_def_level == 0) { + pg_info.num_nulls = 0; + pg_info.num_valid = pg_info.num_rows; + } + } + // if the rep level histogram is present, we can get the total number of values + // from that + else if (rep_hist != nullptr) { + if (pg_info.num_nulls.has_value()) { + auto const h = &rep_hist[pg_idx * (max_rep_level + 1)]; + auto const num_values = std::reduce(h, h + max_rep_level + 1); + pg_info.num_valid = num_values - pg_info.num_nulls.value(); + } + } + + // If none of the ifs above triggered, then we have neither histogram (likely the writer + // doesn't produce them, the r:0 d:1 case should have been handled above). The column index + // doesn't give us value counts, so we'll have to rely on the page headers. If the histogram + // info is missing or insufficient, then just return without modifying the row_group_info. + if (not pg_info.num_nulls.has_value() or not pg_info.num_valid.has_value()) { return; } + + // Like above, if using older page indexes that lack size info, then return without modifying + // the row_group_info. + // TODO: cudf will still set the per-page var_bytes to '0' even for all null pages. Need to + // check the behavior of other implementations (once there are some). Some may not set the + // var bytes for all null pages, so check the `null_pages` field on the column index. + if (schema.type == BYTE_ARRAY and not pg_info.var_bytes_size.has_value()) { return; } + + chunk_info.pages.push_back(std::move(pg_info)); + } + } + + rg_info.column_chunks = std::move(chunks); +} + aggregate_reader_metadata::aggregate_reader_metadata( host_span const> sources) : per_file_metadata(metadatas_from_sources(sources)), @@ -470,23 +627,29 @@ aggregate_reader_metadata::select_row_groups( "Must specify row groups for each source"); for (size_t src_idx = 0; src_idx < row_group_indices.size(); ++src_idx) { + auto const& fmd = per_file_metadata[src_idx]; for (auto const& rowgroup_idx : row_group_indices[src_idx]) { CUDF_EXPECTS( - rowgroup_idx >= 0 && - rowgroup_idx < static_cast(per_file_metadata[src_idx].row_groups.size()), + rowgroup_idx >= 0 && rowgroup_idx < static_cast(fmd.row_groups.size()), "Invalid rowgroup index"); selection.emplace_back(rowgroup_idx, rows_to_read, src_idx); + // if page-level indexes are present, then collect extra chunk and page info. + column_info_for_row_group(selection.back(), 0); rows_to_read += get_row_group(rowgroup_idx, src_idx).num_rows; } } } else { size_type count = 0; for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { - for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { + auto const& fmd = per_file_metadata[src_idx]; + for (size_t rg_idx = 0; rg_idx < fmd.row_groups.size(); ++rg_idx) { + auto const& rg = fmd.row_groups[rg_idx]; auto const chunk_start_row = count; - count += get_row_group(rg_idx, src_idx).num_rows; + count += rg.num_rows; if (count > rows_to_skip || count == 0) { selection.emplace_back(rg_idx, chunk_start_row, src_idx); + // if page-level indexes are present, then collect extra chunk and page info. + column_info_for_row_group(selection.back(), chunk_start_row); } if (count >= rows_to_skip + rows_to_read) { break; } } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 8d8ab8707be..8295654764e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,43 @@ namespace cudf::io::parquet::detail { +/** + * @brief page location and size info + */ +struct page_info { + // page location info from the offset index + PageLocation location; + // number of rows in the page, calculated from offset index + int64_t num_rows; + // number of valid values in page, calculated from definition level histogram if present + std::optional num_valid; + // number of null values in page, calculated from definition level histogram if present + std::optional num_nulls; + // number of bytes of variable-length data from the offset index (byte_array columns only) + std::optional var_bytes_size; +}; + +/** + * @brief column chunk metadata + */ +struct column_chunk_info { + // offset in file of the dictionary (if present) + std::optional dictionary_offset; + // size of dictionary (if present) + std::optional dictionary_size; + std::vector pages; + + /** + * @brief Determine if this column chunk has a dictionary page. + * + * @return `true` if this column chunk has a dictionary page. + */ + [[nodiscard]] constexpr bool has_dictionary() const + { + return dictionary_offset.has_value() && dictionary_size.has_value(); + } +}; + /** * @brief The row_group_info class */ @@ -43,12 +80,20 @@ struct row_group_info { size_t start_row; size_type source_index; // file index. + // Optional metadata pulled from the column and offset indexes, if present. + std::optional> column_chunks; + row_group_info() = default; row_group_info(size_type index, size_t start_row, size_type source_index) : index{index}, start_row{start_row}, source_index{source_index} { } + + /** + * @brief Indicates the presence of page-level indexes. + */ + [[nodiscard]] bool has_page_index() const { return column_chunks.has_value(); } }; /** @@ -104,6 +149,14 @@ class aggregate_reader_metadata { */ [[nodiscard]] size_type calc_num_row_groups() const; + /** + * @brief Calculate column index info for the given `row_group_info` + * + * @param rg_info Struct used to summarize metadata for a single row group + * @param chunk_start_row Global index of first row in the row group + */ + void column_info_for_row_group(row_group_info& rg_info, size_type chunk_start_row) const; + public: aggregate_reader_metadata(host_span const> sources); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index aa4f96aa2e0..51a18de966e 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -309,6 +309,95 @@ void generate_depth_remappings(std::map, std::ve return total_pages; } +/** + * @brief Count the total number of pages using page index information. + */ +[[nodiscard]] size_t count_page_headers_with_pgidx( + cudf::detail::hostdevice_vector& chunks, rmm::cuda_stream_view stream) +{ + size_t total_pages = 0; + for (auto& chunk : chunks) { + CUDF_EXPECTS(chunk.h_chunk_info != nullptr, "Expected non-null column info struct"); + auto const& chunk_info = *chunk.h_chunk_info; + chunk.num_dict_pages = chunk_info.has_dictionary() ? 1 : 0; + chunk.num_data_pages = chunk_info.pages.size(); + total_pages += chunk.num_data_pages + chunk.num_dict_pages; + } + + // count_page_headers() also pushes chunks to device, so not using thrust here + chunks.host_to_device_async(stream); + + return total_pages; +} + +// struct used to carry info from the page indexes to the device +struct page_index_info { + int32_t num_rows; + int32_t chunk_row; + int32_t num_nulls; + int32_t num_valids; + int32_t str_bytes; +}; + +// functor to copy page_index_info into the PageInfo struct +struct copy_page_info { + device_span page_indexes; + device_span pages; + + __device__ void operator()(size_type idx) + { + auto& pg = pages[idx]; + auto const& pi = page_indexes[idx]; + pg.num_rows = pi.num_rows; + pg.chunk_row = pi.chunk_row; + pg.has_page_index = true; + pg.num_nulls = pi.num_nulls; + pg.num_valids = pi.num_valids; + pg.str_bytes_from_index = pi.str_bytes; + pg.str_bytes = pi.str_bytes; + pg.start_val = 0; + pg.end_val = pg.num_valids; + } +}; + +/** + * @brief Set fields on the pages that can be derived from page indexes. + * + * This replaces some preprocessing steps, such as page string size calculation. + */ +void fill_in_page_info(host_span chunks, + device_span pages, + rmm::cuda_stream_view stream) +{ + auto const num_pages = pages.size(); + std::vector page_indexes(num_pages); + + for (size_t c = 0, page_count = 0; c < chunks.size(); c++) { + auto const& chunk = chunks[c]; + CUDF_EXPECTS(chunk.h_chunk_info != nullptr, "Expected non-null column info struct"); + auto const& chunk_info = *chunk.h_chunk_info; + size_t start_row = 0; + page_count += chunk.num_dict_pages; + for (size_t p = 0; p < chunk_info.pages.size(); p++, page_count++) { + auto& page = page_indexes[page_count]; + page.num_rows = chunk_info.pages[p].num_rows; + page.chunk_row = start_row; + page.num_nulls = chunk_info.pages[p].num_nulls.value_or(0); + page.num_valids = chunk_info.pages[p].num_valid.value_or(0); + page.str_bytes = chunk_info.pages[p].var_bytes_size.value_or(0); + + start_row += page.num_rows; + } + } + + auto d_page_indexes = cudf::detail::make_device_uvector_async( + page_indexes, stream, rmm::mr::get_current_device_resource()); + + auto iter = thrust::make_counting_iterator(0); + thrust::for_each( + rmm::exec_policy_nosync(stream), iter, iter + num_pages, copy_page_info{d_page_indexes, pages}); +} + /** * @brief Returns a string representation of known encodings * @@ -445,6 +534,7 @@ cudf::detail::hostdevice_vector sort_pages(device_span */ void decode_page_headers(pass_intermediate_data& pass, device_span unsorted_pages, + bool has_page_index, rmm::cuda_stream_view stream) { CUDF_FUNC_RANGE(); @@ -491,6 +581,8 @@ void decode_page_headers(pass_intermediate_data& pass, } } + if (has_page_index) { fill_in_page_info(pass.chunks, unsorted_pages, stream); } + // compute max bytes needed for level data auto level_bit_size = cudf::detail::make_counting_transform_iterator( 0, cuda::proclaim_return_type([chunks = pass.chunks.d_begin()] __device__(int i) { @@ -902,12 +994,13 @@ void reader::impl::read_compressed_data() } // Process dataset chunk pages into output columns - auto const total_pages = count_page_headers(chunks, _stream); + auto const total_pages = _has_page_index ? count_page_headers_with_pgidx(chunks, _stream) + : count_page_headers(chunks, _stream); if (total_pages <= 0) { return; } rmm::device_uvector unsorted_pages(total_pages, _stream); // decoding of column/page information - decode_page_headers(pass, unsorted_pages, _stream); + decode_page_headers(pass, unsorted_pages, _has_page_index, _stream); CUDF_EXPECTS(pass.page_offsets.size() - 1 == static_cast(_input_columns.size()), "Encountered page_offsets / num_columns mismatch"); } @@ -1140,6 +1233,11 @@ void reader::impl::preprocess_file( _metadata->select_row_groups( row_group_indices, skip_rows, num_rows, output_types, filter, _stream); + // check for page indexes + _has_page_index = std::all_of(_file_itm_data.row_groups.begin(), + _file_itm_data.row_groups.end(), + [](auto const& row_group) { return row_group.has_page_index(); }); + if (_file_itm_data.global_num_rows > 0 && not _file_itm_data.row_groups.empty() && not _input_columns.empty()) { // fills in chunk information without physically loading or decompressing @@ -1191,13 +1289,16 @@ void reader::impl::generate_list_column_row_count_estimates() // field in ColumnChunkDesc is the absolute row index for the whole file. chunk_row in PageInfo is // relative to the beginning of the chunk. so in the kernels, chunk.start_row + page.chunk_row // gives us the absolute row index - auto key_input = thrust::make_transform_iterator(pass.pages.d_begin(), get_page_chunk_idx{}); - auto page_input = thrust::make_transform_iterator(pass.pages.d_begin(), get_page_num_rows{}); - thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream), - key_input, - key_input + pass.pages.size(), - page_input, - chunk_row_output_iter{pass.pages.device_ptr()}); + // Note: chunk_row is already computed if we have column indexes + if (not _has_page_index) { + auto key_input = thrust::make_transform_iterator(pass.pages.d_begin(), get_page_chunk_idx{}); + auto page_input = thrust::make_transform_iterator(pass.pages.d_begin(), get_page_num_rows{}); + thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream), + key_input, + key_input + pass.pages.size(), + page_input, + chunk_row_output_iter{pass.pages.device_ptr()}); + } // finally, fudge the last page for each column such that it ends on the real known row count // for the pass. this is so that as we march through the subpasses, we will find that every column diff --git a/cpp/tests/io/parquet_reader_test.cpp b/cpp/tests/io/parquet_reader_test.cpp index abbd0c97f07..c13bf488e6a 100644 --- a/cpp/tests/io/parquet_reader_test.cpp +++ b/cpp/tests/io/parquet_reader_test.cpp @@ -2060,6 +2060,91 @@ TEST_F(ParquetReaderTest, DeltaSkipRowsWithNulls) } } +// test that using page stats is working for full reads and various skip rows +TEST_F(ParquetReaderTest, StringsWithPageStats) +{ + constexpr int num_rows = 10'000; + constexpr auto seed = 21337; + + std::mt19937 engine{seed}; + auto int32_list_nulls = make_parquet_list_col(engine, num_rows, 5, true); + auto int32_list = make_parquet_list_col(engine, num_rows, 5, false); + auto int64_list_nulls = make_parquet_list_col(engine, num_rows, 5, true); + auto int64_list = make_parquet_list_col(engine, num_rows, 5, false); + auto int16_list_nulls = make_parquet_list_col(engine, num_rows, 5, true); + auto int16_list = make_parquet_list_col(engine, num_rows, 5, false); + auto int8_list_nulls = make_parquet_list_col(engine, num_rows, 5, true); + auto int8_list = make_parquet_list_col(engine, num_rows, 5, false); + + auto str_list_nulls = make_parquet_string_list_col(engine, num_rows, 5, 32, true); + auto str_list = make_parquet_string_list_col(engine, num_rows, 5, 32, false); + auto big_str_list_nulls = make_parquet_string_list_col(engine, num_rows, 5, 256, true); + auto big_str_list = make_parquet_string_list_col(engine, num_rows, 5, 256, false); + + auto int32_data = random_values(num_rows); + auto int64_data = random_values(num_rows); + auto int16_data = random_values(num_rows); + auto int8_data = random_values(num_rows); + auto str_data = string_values(engine, num_rows, 32); + auto big_str_data = string_values(engine, num_rows, 256); + + auto const validity = random_validity(engine); + auto const no_nulls = cudf::test::iterators::no_nulls(); + column_wrapper int32_nulls_col{int32_data.begin(), int32_data.end(), validity}; + column_wrapper int32_col{int32_data.begin(), int32_data.end(), no_nulls}; + column_wrapper int64_nulls_col{int64_data.begin(), int64_data.end(), validity}; + column_wrapper int64_col{int64_data.begin(), int64_data.end(), no_nulls}; + + auto str_col = cudf::test::strings_column_wrapper(str_data.begin(), str_data.end(), no_nulls); + auto str_col_nulls = cudf::purge_nonempty_nulls( + cudf::test::strings_column_wrapper(str_data.begin(), str_data.end(), validity)); + auto big_str_col = + cudf::test::strings_column_wrapper(big_str_data.begin(), big_str_data.end(), no_nulls); + auto big_str_col_nulls = cudf::purge_nonempty_nulls( + cudf::test::strings_column_wrapper(big_str_data.begin(), big_str_data.end(), validity)); + + cudf::table_view tbl({int32_col, int32_nulls_col, *int32_list, *int32_list_nulls, + int64_col, int64_nulls_col, *int64_list, *int64_list_nulls, + *int16_list, *int16_list_nulls, *int8_list, *int8_list_nulls, + str_col, *str_col_nulls, *str_list, *str_list_nulls, + big_str_col, *big_str_col_nulls, *big_str_list, *big_str_list_nulls}); + + auto const filepath = temp_env->get_temp_filepath("StringsWithPageStats.parquet"); + auto const out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tbl) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .max_page_size_rows(5'000) + .build(); + cudf::io::write_parquet(out_opts); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ + // skip and then read rest of file + {-1, -1}, {1, -1}, {2, -1}, {32, -1}, {33, -1}, {128, -1}, {1'000, -1}, + // no skip but truncate + {0, 1'000}, {0, 6'000}, + // cross page boundaries + {3'000, 5'000} + }; + + // clang-format on + for (auto p : params) { + cudf::io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf::io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? num_rows - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + std::vector expected = cudf::slice(tbl, slice_indices); + + CUDF_TEST_EXPECT_TABLES_EQUAL(result.tbl->view(), expected[0]); + } +} + /////////////////// // metadata tests