Skip to content

Commit

Permalink
Fix invalid memory access in Parquet reader (rapidsai#14637)
Browse files Browse the repository at this point in the history
Fixes rapidsai#14633

When reading files in multiple passes, some pointer fields in `ColumnChunkDesc` that point to transient memory are not cleared out at the end of each pass. This can lead to trying to dereference deallocated memory during Parquet reader string preprocessing.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai#14637
  • Loading branch information
etseidl authored and abellina committed Dec 28, 2023
1 parent d830ab0 commit 7184376
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 27 deletions.
11 changes: 9 additions & 2 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageDataFixed(

// TODO: abellina all_types_filter???
//if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, all_types_filter{}, true)) { return; }
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::FIXED_WIDTH_NO_DICT}, true)) { return; }
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows,
mask_filter{decode_kernel_mask::FIXED_WIDTH_NO_DICT},
page_processing_stage::DECODE)) {
return;
}

// must come after the kernel mask check
[[maybe_unused]] null_count_back_copier _{s, t};
Expand Down Expand Up @@ -420,7 +424,10 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageDataFixedDict(
if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT))) { return; }

// TODO: abellina all_types_filter???
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::FIXED_WIDTH_DICT}, true)) { return; }
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows,
mask_filter{decode_kernel_mask::FIXED_WIDTH_DICT},
page_processing_stage::DECODE)) { return; }

#ifdef ABDEBUG
if (t == 0) {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ __global__ void __launch_bounds__(preprocess_block_size)
decoders[level_type::NUM_LEVEL_TYPES] = {{def_runs}, {rep_runs}};

// setup page info
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, all_types_filter{}, false)) {
return;
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, all_types_filter{}, page_processing_stage::PREPROCESS)) {
return;
}

// initialize the stream decoders (requires values computed in setupLocalPageInfo)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ __global__ void __launch_bounds__(decode_block_size)
min_row,
num_rows,
mask_filter{decode_kernel_mask::GENERAL},
true)) {
page_processing_stage::DECODE)) {
return;
}

Expand Down
23 changes: 15 additions & 8 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,12 @@ struct mask_filter {
}
};

enum class page_processing_stage {
PREPROCESS,
STRING_BOUNDS,
DECODE,
};

/**
* @brief Sets up block-local page state information from the global pages.
*
Expand All @@ -1011,7 +1017,7 @@ struct mask_filter {
* @param[in] min_row Crop all rows below min_row
* @param[in] num_rows Maximum number of rows to read
* @param[in] filter Filtering function used to decide which pages to operate on
* @param[in] is_decode_step If we are setting up for the decode step (instead of the preprocess)
* @param[in] stage What stage of the decoding process is this being called from
* @tparam Filter Function that takes a PageInfo reference and returns true if the given page should
* be operated on Currently only used by gpuComputePageSizes step)
* @return True if this page should be processed further
Expand All @@ -1023,7 +1029,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
size_t min_row,
size_t num_rows,
Filter filter,
bool is_decode_step)
page_processing_stage stage)
{
int t = threadIdx.x;

Expand Down Expand Up @@ -1112,7 +1118,8 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
//
// NOTE: this check needs to be done after the null counts have been zeroed out
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
if (is_decode_step && s->num_rows == 0 &&
if ((stage == page_processing_stage::STRING_BOUNDS || stage == page_processing_stage::DECODE) &&
s->num_rows == 0 &&
!(has_repetition && (is_bounds_page(s, min_row, num_rows, has_repetition) ||
is_page_contained(s, min_row, num_rows)))) {
return false;
Expand Down Expand Up @@ -1224,7 +1231,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
// NOTE: in a chunked read situation, s->col.column_data_base and s->col.valid_map_base
// will be aliased to memory that has been freed when we get here in the non-decode step, so
// we cannot check against nullptr. we'll just check a flag directly.
if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
int max_depth = s->col.max_nesting_depth;
for (int idx = 0; idx < max_depth; idx++) {
PageNestingDecodeInfo* nesting_info = &s->nesting_info[idx];
Expand Down Expand Up @@ -1383,13 +1390,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,

// if we're in the decoding step, jump directly to the first
// value we care about
if (is_decode_step) {
if (stage == page_processing_stage::DECODE) {
s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0;
} else {
} else if (stage == page_processing_stage::PREPROCESS) {
s->input_value_count = 0;
s->input_leaf_count = 0;
s->page.skipped_values =
-1; // magic number to indicate it hasn't been set for use inside UpdatePageSizes
// magic number to indicate it hasn't been set for use inside UpdatePageSizes
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
}
}
Expand Down
20 changes: 14 additions & 6 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,13 @@ __global__ void __launch_bounds__(96)
auto* const db = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BINARY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BINARY},
page_processing_stage::DECODE)) {
return;
}

Expand Down Expand Up @@ -446,9 +450,13 @@ __global__ void __launch_bounds__(decode_block_size)
auto* const dba = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::DECODE)) {
return;
}

Expand Down
40 changes: 32 additions & 8 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,15 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBou

// setup page info
auto const mask = BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY);
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{mask},
page_processing_stage::STRING_BOUNDS)) {
return;
}

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

Expand Down Expand Up @@ -658,8 +666,15 @@ __global__ void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPageS
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY;
if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; }
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::DELTA_BYTE_ARRAY},
page_processing_stage::STRING_BOUNDS)) {
return;
}

auto const start_value = pp->start_val;

Expand Down Expand Up @@ -721,8 +736,13 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0;

// setup page info
if (!setupLocalPageInfo(
s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::STRING}, true)) {
if (!setupLocalPageInfo(s,
pp,
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::STRING_BOUNDS)) {
return;
}

Expand Down Expand Up @@ -815,9 +835,13 @@ __global__ void __launch_bounds__(decode_block_size)
int const lane_id = t % warp_size;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::STRING;
if (!setupLocalPageInfo(
s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) {
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{decode_kernel_mask::STRING},
page_processing_stage::DECODE)) {
return;
}

Expand Down

0 comments on commit 7184376

Please sign in to comment.