Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address potential race conditions in Parquet reader #14602

Merged
merged 20 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ struct delta_binary_decoder {

// need to account for the first value from header on first pass
if (current_value_idx == 0) {
// make sure all threads access current_value_idx above before incrementing
__syncwarp();
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
if (lane_id == 0) { current_value_idx++; }
__syncwarp();
if (current_value_idx >= value_count) { return; }
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ __device__ cuda::std::pair<int, int> gpuDecodeDictionaryIndices(page_state_s* s,

pos += batch_len;
}
// racecheck wants a sync here
__syncwarp();
return {pos, str_len};
}

Expand Down Expand Up @@ -549,6 +551,9 @@ __device__ void gpuDecodeStream(
batch_coded_count += batch_len;
value_count += batch_len;
}
// issue #14597
// racecheck reported race between reads at the start of this function and the writes below
__syncwarp();

// update the stream info
if (!t) {
Expand Down Expand Up @@ -691,6 +696,10 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;

// need this to ensure input_value_count is read by all threads before s->input_value_count
// is modified below (just in case input_value count >= target_input_value_count).
__syncwarp();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I'm not so sure about needing. In the worst case, thead 0 sets the local var, skips the loop (and the syncwarp within it) and then overwrites the shared value before other threads read it. But in that case it will just overwrite with the same value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need to update s->nz_count, s->input_value_count and s->input_row_count if we never enter the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking no...they shouldn't have changed if the loop wasn't entered. But I'll admit this is one of the parts of the parquet code that I understand the least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, we should be able to return early if initially input_value_count >= target_input_value_count, right?
That would simplify the logic and prevent the tool from reporting the race condition.
CC @nvdbaranec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change and verified that racecheck is happy


// process until we've reached the target
while (input_value_count < target_input_value_count) {
// determine the nesting bounds for this thread (the range of nesting depths we
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ __global__ void __launch_bounds__(96)
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
Expand Down Expand Up @@ -507,7 +507,7 @@ __global__ void __launch_bounds__(decode_block_size)
} else { // warp 3
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of prefixes, warp 2 will
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// can skip all this if we know there are no nulls
if (max_def == 0 && !is_bounds_pg) {
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
if (t == 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
}
__syncthreads();
return {0, s->num_input_values};
}

Expand Down Expand Up @@ -268,6 +271,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
pp->num_nulls = num_nulls;
pp->num_valids = total_leaf_values;
}
__syncthreads();
}
// already filtered out unwanted pages, so need to count all non-null values in this page
else {
Expand Down
Loading