Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet reader: bug fix for a num_rows/skip_rows corner case, w/optimization for nested preprocessing #11752

Merged
78 changes: 38 additions & 40 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,8 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
int t)
{
// max nesting depth of the column
int const max_depth = s->col.max_nesting_depth;
int const max_depth = s->col.max_nesting_depth;
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
// how many (input) values we've processed in the page so far
int input_value_count = s->input_value_count;
// how many rows we've processed in the page so far
Expand Down Expand Up @@ -1235,7 +1236,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
uint32_t const warp_valid_mask =
// for flat schemas, a simple ballot_sync gives us the correct count and bit positions
// because every value in the input matches to a value in the output
max_depth == 1
!has_repetition
? ballot(is_valid)
:
// for nested schemas, it's more complicated. This warp will visit 32 incoming values,
Expand Down Expand Up @@ -1284,11 +1285,12 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// the correct position to start reading. since we are about to write the validity vector here
// we need to adjust our computed mask to take into account the write row bounds.
int const in_write_row_bounds =
max_depth == 1
!has_repetition
? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows)
: in_row_bounds;
int const first_thread_in_write_range =
max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0;
!has_repetition ? __ffs(ballot(in_write_row_bounds)) - 1 : 0;

// # of bits to of the validity mask to write out
int const warp_valid_mask_bit_count =
first_thread_in_write_range < 0 ? 0 : warp_value_count - first_thread_in_write_range;
Expand Down Expand Up @@ -1384,7 +1386,6 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s,
{
// max nesting depth of the column
int max_depth = s->col.max_nesting_depth;
// bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false;
// how many input level values we've processed in the page so far
int input_value_count = s->input_value_count;
// how many leaf values we've processed in the page so far
Expand Down Expand Up @@ -1479,6 +1480,11 @@ __global__ void __launch_bounds__(block_size)
int t = threadIdx.x;
PageInfo* pp = &pages[page_idx];

// we only need to preprocess hierarchies with repetition in them (ie, hierarchies
// containing lists anywhere within).
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;
if (!has_repetition) { return; }

if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) {
return;
}
Expand All @@ -1504,8 +1510,6 @@ __global__ void __launch_bounds__(block_size)
}
__syncthreads();

bool has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// optimization : it might be useful to have a version of gpuDecodeStream that could go wider than
// 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step
// when in the actual value decoding kernel. However, during this preprocess step we have no such
Expand All @@ -1516,16 +1520,13 @@ __global__ void __launch_bounds__(block_size)
while (!s->error && s->input_value_count < s->num_input_values) {
// decode repetition and definition levels. these will attempt to decode at
// least up to the target, but may decode a few more.
if (has_repetition) {
gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION);
}
gpuDecodeStream(s->rep, s, target_input_count, t, level_type::REPETITION);
gpuDecodeStream(s->def, s, target_input_count, t, level_type::DEFINITION);
__syncwarp();

// we may have decoded different amounts from each stream, so only process what we've been
int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION],
s->lvl_count[level_type::DEFINITION])
: s->lvl_count[level_type::DEFINITION];
int actual_input_count =
min(s->lvl_count[level_type::REPETITION], s->lvl_count[level_type::DEFINITION]);

// process what we got back
gpuUpdatePageSizes(s, actual_input_count, t, trim_pass);
Expand Down Expand Up @@ -1573,6 +1574,8 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32;
}

bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
Expand Down Expand Up @@ -1625,7 +1628,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData(
// - so we will end up ignoring the first two input rows, and input rows 2..n will
// get written to the output starting at position 0.
//
if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; }
if (!has_repetition) { dst_pos -= s->first_row; }

// target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values
// before first_row) in the flat hierarchy case.
Expand Down Expand Up @@ -1765,6 +1768,8 @@ void PreprocessColumnData(hostdevice_vector<PageInfo>& pages,

// computes:
// PageInfo::chunk_row for all pages
// Note: this is doing some redundant work for pages in flat hierarchies. chunk_row has already
// been computed during header decoding. the overall amount of work here is very small though.
auto key_input = thrust::make_transform_iterator(
pages.device_ptr(), [] __device__(PageInfo const& page) { return page.chunk_idx; });
auto page_input = thrust::make_transform_iterator(
Expand Down Expand Up @@ -1840,43 +1845,36 @@ void PreprocessColumnData(hostdevice_vector<PageInfo>& pages,
return page.nesting[l_idx].size;
});

// compute column size.
// if this buffer is part of a list hierarchy, we need to determine it's
// final size and allocate it here.
//
// for struct columns, higher levels of the output columns are shared between input
// columns. so don't compute any given level more than once.
if (out_buf.size == 0) {
if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && out_buf.size == 0) {
int size = thrust::reduce(rmm::exec_policy(stream), size_input, size_input + pages.size());

// Handle a specific corner case. It is possible to construct a parquet file such that
// a column within a row group contains more rows than the row group itself. This may be
// invalid, but we have seen instances of this in the wild, including how they were created
// using the apache parquet tools. Normally, the trim pass would handle this case quietly,
// but if we are not running the trim pass (which is most of the time) we need to cap the
// number of rows we will allocate/read from the file with the amount specified in the
// associated row group. This only applies to columns that are not children of lists as
// those may have an arbitrary number of rows in them.
if (!uses_custom_row_bounds &&
!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) &&
size > static_cast<size_type>(num_rows)) {
size = static_cast<size_type>(num_rows);
}

// if this is a list column add 1 for non-leaf levels for the terminating offset
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, stream, mr);
}

// compute per-page start offset
thrust::exclusive_scan_by_key(rmm::exec_policy(stream),
page_keys.begin(),
page_keys.end(),
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
static_cast<int>(src_col_schema),
static_cast<int>(l_idx)});
// for nested hierarchies, compute per-page start offset.
// it would be better/safer to be checking (schema.max_repetition_level > 0) here, but there's
// no easy way to get at that info here. we'd have to move this function into reader_impl.cu
Comment on lines +1864 to +1865
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be an issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were right. This ended up being a bug :) If you have a struct at the top of a nested hierarchy, this logic fails. The max_repetition_level check is the correct one.

if ((out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) ||
out_buf.type.id() == type_id::LIST) {
thrust::exclusive_scan_by_key(rmm::exec_policy(stream),
page_keys.begin(),
page_keys.end(),
size_input,
start_offset_output_iterator{pages.device_ptr(),
page_index.begin(),
0,
static_cast<int>(src_col_schema),
static_cast<int>(l_idx)});
}
}
}

Expand Down
59 changes: 33 additions & 26 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1353,26 +1353,39 @@ void reader::impl::preprocess_columns(hostdevice_vector<gpu::ColumnChunkDesc>& c
hostdevice_vector<gpu::PageInfo>& pages,
size_t min_row,
size_t total_rows,
bool uses_custom_row_bounds,
bool has_lists)
bool uses_custom_row_bounds)
{
// TODO : we should be selectively preprocessing only columns that have
// lists in them instead of doing them all if even one contains lists.

// if there are no lists, simply allocate every allocate every output
// column to be of size num_rows
if (!has_lists) {
std::function<void(std::vector<column_buffer>&)> create_columns =
[&](std::vector<column_buffer>& cols) {
for (size_t idx = 0; idx < cols.size(); idx++) {
auto& col = cols[idx];
col.create(total_rows, _stream, _mr);
create_columns(col.children);
}
};
create_columns(_output_columns);
} else {
// preprocess per-nesting level sizes by page
// iterate over all input columns and allocate any associated output
// buffers if they are not part of a list hierarchy. mark down
// if we have any list columns that need further processing.
bool has_lists = false;
for (size_t idx = 0; idx < _input_columns.size(); idx++) {
auto const& input_col = _input_columns[idx];
size_t const max_depth = input_col.nesting_depth();

auto* cols = &_output_columns;
for (size_t l_idx = 0; l_idx < max_depth; l_idx++) {
auto& out_buf = (*cols)[input_col.nesting[l_idx]];
cols = &out_buf.children;

// if this has a list parent, we will have to do further work in gpu::PreprocessColumnData
// to know how big this buffer actually is.
if (out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) {
has_lists = true;
}
// if we haven't already processed this column because it is part of a struct hierarchy
else if (out_buf.size == 0) {
// add 1 for the offset if this is a list column
out_buf.create(
out_buf.type.id() == type_id::LIST && l_idx < max_depth ? total_rows + 1 : total_rows,
_stream,
_mr);
}
}
}

// if we have columns containing lists, further preprocessing is necessary.
if (has_lists) {
gpu::PreprocessColumnData(pages,
chunks,
_input_columns,
Expand Down Expand Up @@ -1636,9 +1649,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Keep track of column chunk file offsets
std::vector<size_t> column_chunk_offsets(num_chunks);

// if there are lists present, we need to preprocess
bool has_lists = false;

// Initialize column chunk information
size_t total_decompressed_size = 0;
auto remaining_rows = num_rows;
Expand All @@ -1657,9 +1667,6 @@ table_with_metadata reader::impl::read(size_type skip_rows,
auto& col_meta = _metadata->get_column_metadata(rg.index, rg.source_index, col.schema_idx);
auto& schema = _metadata->get_schema(col.schema_idx);

// this column contains repetition levels and will require a preprocess
if (schema.max_repetition_level > 0) { has_lists = true; }

auto [type_width, clock_rate, converted_type] =
conversion_info(to_type_id(schema, _strings_to_categorical, _timestamp_type.id()),
_timestamp_type.id(),
Expand Down Expand Up @@ -1755,7 +1762,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
//
// - for nested schemas, output buffer offset values per-page, per nesting-level for the
// purposes of decoding.
preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists);
preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds);

// decoding of column data itself
decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows);
Expand Down
6 changes: 2 additions & 4 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class reader::impl {
hostdevice_vector<gpu::PageNestingInfo>& page_nesting_info);

/**
* @brief Preprocess column information for nested schemas.
* @brief Preprocess column information and allocate output buffers.
*
* There are several pieces of information we can't compute directly from row counts in
* the parquet headers when dealing with nested schemas.
Expand All @@ -163,15 +163,13 @@ class reader::impl {
* @param total_rows Maximum number of rows to read
* @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific
* bounds
* @param has_lists Whether or not this data contains lists and requires
* a preprocess.
*/
void preprocess_columns(hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
hostdevice_vector<gpu::PageInfo>& pages,
size_t min_row,
size_t total_rows,
bool uses_custom_row_bounds,
bool has_lists);
bool uses_custom_row_bounds);

/**
* @brief Converts the page data and outputs to columns.
Expand Down
Loading