Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address potential race conditions in Parquet reader #14602

Merged
merged 20 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ struct delta_binary_decoder {

// need to account for the first value from header on first pass
if (current_value_idx == 0) {
// make sure all threads access current_value_idx above before incrementing
__syncwarp();
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
if (lane_id == 0) { current_value_idx++; }
__syncwarp();
if (current_value_idx >= value_count) { return; }
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand All @@ -495,6 +495,10 @@ __global__ void __launch_bounds__(decode_block_size)
uint32_t src_target_pos = target_pos + skipped_leaf_values;

// WARP1: Decode dictionary indices, booleans or string positions
// NOTE: racecheck complains of a RAW error involving the s->dict_pos assignment below.
// This is likely a false positive in practice, but could be solved by wrapping the next
// 9 lines in `if (s->dict_pos < src_target_pos) {}`. If that change is made here, it will
// be needed in the other DecodeXXX kernels.
if (s->dict_base) {
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, t & 0x1f).first;
} else if ((s->col.data_type & 7) == BOOLEAN) {
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ __device__ cuda::std::pair<int, int> gpuDecodeDictionaryIndices(page_state_s* s,
int pos = s->dict_pos;
int str_len = 0;

// NOTE: racecheck warns about a RAW involving s->dict_pos, which is likely a false positive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something along these lines?

Suggested change
// NOTE: racecheck warns about a RAW involving s->dict_pos, which is likely a false positive
// NOTE: racecheck warns about a RAW involving s->dict_pos, which is likely a false positive because the only path that does not include a sync will lead to s->dict_pos being overwritten with the same value

// because the only path that does not include a sync will lead to s->dict_pos being overwritten
// with the same value

while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -357,6 +361,10 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t
uint8_t const* end = s->data_end;
int64_t pos = s->dict_pos;

// NOTE: racecheck warns about a RAW involving s->dict_pos, which is likely a false positive
// because the only path that does not include a sync will lead to s->dict_pos being overwritten
// with the same value

while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -549,6 +557,9 @@ __device__ void gpuDecodeStream(
batch_coded_count += batch_len;
value_count += batch_len;
}
// issue #14597
// racecheck reported race between reads at the start of this function and the writes below
__syncwarp();

// update the stream info
if (!t) {
Expand Down Expand Up @@ -681,6 +692,9 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
level_t const* const def,
int t)
{
// exit early if there's no work to do
if (s->input_value_count >= target_input_value_count) { return; }

// max nesting depth of the column
int const max_depth = s->col.max_nesting_depth;
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ __global__ void __launch_bounds__(96)
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
Expand Down Expand Up @@ -507,7 +507,7 @@ __global__ void __launch_bounds__(decode_block_size)
} else { // warp 3
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of prefixes, warp 2 will
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// can skip all this if we know there are no nulls
if (max_def == 0 && !is_bounds_pg) {
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
if (t == 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
}
return {0, s->num_input_values};
}

Expand Down Expand Up @@ -294,7 +296,6 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
pp->num_nulls = null_count;
pp->num_valids = pp->num_input_values - null_count;
}
__syncthreads();
Copy link
Contributor

@nvdbaranec nvdbaranec Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems dangerous to remove. Aren't all threads except 0 in danger of using the wrong pp->num_nulls value right below?

Copy link
Contributor Author

@etseidl etseidl Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another only-valid-on-thread-0 result. I originally added more syncthreads before all the other returns, but @vuule pointed out that once this function returns, all that happens is thread 0 takes the return values and copies them to global memory (along with 2 shared mem fields) and then returns. The other threads simply return ignored garbage and exit.

Actually, I should probably move this entire function into gpuComputeStringPageBounds, which would make the above more obvious. It made sense to be a standalone when it was part of the gpuComputePageStringSizes kernel (and back then the syncthreads was necessary), but now that it's its own kernel, there's no need for it.


end_value -= pp->num_nulls;
}
Expand Down Expand Up @@ -848,7 +849,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it
vuule marked this conversation as resolved.
Show resolved Hide resolved
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down
Loading