Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address potential race conditions in Parquet reader #14602

Merged
merged 20 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ struct delta_binary_decoder {

// need to account for the first value from header on first pass
if (current_value_idx == 0) {
// make sure all threads access current_value_idx above before incrementing
__syncwarp();
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
if (lane_id == 0) { current_value_idx++; }
__syncwarp();
if (current_value_idx >= value_count) { return; }
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ __device__ cuda::std::pair<int, int> gpuDecodeDictionaryIndices(page_state_s* s,
int pos = s->dict_pos;
int str_len = 0;

// ensure all threads read s->dict_pos before returning
__syncwarp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this one. The return value from this function is explicitly stated to only be valid on thread 0. Looking at all the call sites, it's always thread 0 that actually does any work with the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is kind of like the one in gpuUpdateValidityOffsetsAndRowIndices, except here the assignment back to s->dict_pos is done after this call returns. If the loop is entered, then all threads will hit the syncwarp there. It's only an issue if pos >= target_pos. Given this has worked without problems for quite some time, I can get rid of this and the one in gpuDecodeRleBooleans.


while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -357,6 +360,11 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t
uint8_t const* end = s->data_end;
int64_t pos = s->dict_pos;

// ensure all threads read s->dict_pos before returning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the one in gpuDecodeDictionaryIndices

// NOTE: Removing this does not trigger any race warnings, but is similar to access pattern
// in gpuDecodeDictionaryIndices which does.
__syncwarp();

while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -426,6 +434,11 @@ gpuInitStringDescriptors(page_state_s* s, [[maybe_unused]] state_buf* sb, int ta
int pos = s->dict_pos;
int total_len = 0;

// ensure all threads read s->dict_pos before returning
// NOTE: Removing this does not trigger any race warnings, but is similar to access pattern
// in gpuDecodeDictionaryIndices which does.
__syncwarp();

// This step is purely serial
if (!t) {
uint8_t const* cur = s->data_start;
Expand Down Expand Up @@ -549,6 +562,9 @@ __device__ void gpuDecodeStream(
batch_coded_count += batch_len;
value_count += batch_len;
}
// issue #14597
// racecheck reported race between reads at the start of this function and the writes below
__syncwarp();

// update the stream info
if (!t) {
Expand Down Expand Up @@ -691,6 +707,10 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;

// need this to ensure input_value_count is read by all threads before s->input_value_count
// is modified below (just in case input_value count >= target_input_value_count).
__syncwarp();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I'm not so sure about needing. In the worst case, thead 0 sets the local var, skips the loop (and the syncwarp within it) and then overwrites the shared value before other threads read it. But in that case it will just overwrite with the same value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need to update s->nz_count, s->input_value_count and s->input_row_count if we never enter the loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking no...they shouldn't have changed if the loop wasn't entered. But I'll admit this is one of the parts of the parquet code that I understand the least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, we should be able to return early if initially input_value_count >= target_input_value_count, right?
That would simplify the logic and prevent the tool from reporting the race condition.
CC @nvdbaranec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the change and verified that racecheck is happy


// process until we've reached the target
while (input_value_count < target_input_value_count) {
// determine the nesting bounds for this thread (the range of nesting depths we
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ __global__ void __launch_bounds__(96)
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
Expand Down Expand Up @@ -507,7 +507,7 @@ __global__ void __launch_bounds__(decode_block_size)
} else { // warp 3
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of prefixes, warp 2 will
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// can skip all this if we know there are no nulls
if (max_def == 0 && !is_bounds_pg) {
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
if (t == 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
}
__syncthreads();
return {0, s->num_input_values};
}

Expand Down Expand Up @@ -134,6 +137,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
pp->num_nulls = 0;
pp->num_valids = end_row - begin_row;
}
__syncthreads();
return {begin_row, end_row};
}

Expand Down Expand Up @@ -268,6 +272,7 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
pp->num_nulls = num_nulls;
pp->num_valids = total_leaf_values;
}
__syncthreads();
}
// already filtered out unwanted pages, so need to count all non-null values in this page
else {
Expand Down Expand Up @@ -848,7 +853,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it
vuule marked this conversation as resolved.
Show resolved Hide resolved
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down