Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address potential race conditions in Parquet reader #14602

Merged
merged 20 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ struct delta_binary_decoder {

// need to account for the first value from header on first pass
if (current_value_idx == 0) {
// make sure all threads access current_value_idx above before incrementing
__syncwarp();
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
if (lane_id == 0) { current_value_idx++; }
__syncwarp();
if (current_value_idx >= value_count) { return; }
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ __device__ cuda::std::pair<int, int> gpuDecodeDictionaryIndices(page_state_s* s,
int pos = s->dict_pos;
int str_len = 0;

// ensure all threads read s->dict_pos before returning
__syncwarp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this one. The return value from this function is explicitly stated to only be valid on thread 0. Looking at all the call sites, it's always thread 0 that actually does any work with the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is kind of like the one in gpuUpdateValidityOffsetsAndRowIndices, except here the assignment back to s->dict_pos is done after this call returns. If the loop is entered, then all threads will hit the syncwarp there. It's only an issue if pos >= target_pos. Given this has worked without problems for quite some time, I can get rid of this and the one in gpuDecodeRleBooleans.


while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -357,6 +360,9 @@ inline __device__ int gpuDecodeRleBooleans(page_state_s* s, state_buf* sb, int t
uint8_t const* end = s->data_end;
int64_t pos = s->dict_pos;

// ensure all threads read s->dict_pos before returning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the one in gpuDecodeDictionaryIndices

__syncwarp();

while (pos < target_pos) {
int is_literal, batch_len;
if (!t) {
Expand Down Expand Up @@ -549,6 +555,9 @@ __device__ void gpuDecodeStream(
batch_coded_count += batch_len;
value_count += batch_len;
}
// issue #14597
// racecheck reported race between reads at the start of this function and the writes below
__syncwarp();

// update the stream info
if (!t) {
Expand Down Expand Up @@ -681,6 +690,9 @@ __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_input_value
level_t const* const def,
int t)
{
// exit early if there's no work to do
if (s->input_value_count >= target_input_value_count) { return; }

// max nesting depth of the column
int const max_depth = s->col.max_nesting_depth;
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ __global__ void __launch_bounds__(96)
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
Expand Down Expand Up @@ -507,7 +507,7 @@ __global__ void __launch_bounds__(decode_block_size)
} else { // warp 3
target_pos = min(s->nz_count, src_pos + batch_size);
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 3 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of prefixes, warp 2 will
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,

// can skip all this if we know there are no nulls
if (max_def == 0 && !is_bounds_pg) {
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
if (t == 0) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->page.num_valids = s->num_input_values;
s->page.num_nulls = 0;
}
return {0, s->num_input_values};
}

Expand Down Expand Up @@ -294,7 +296,6 @@ __device__ thrust::pair<int, int> page_bounds(page_state_s* const s,
pp->num_nulls = null_count;
pp->num_valids = pp->num_input_values - null_count;
}
__syncthreads();
Copy link
Contributor

@nvdbaranec nvdbaranec Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems dangerous to remove. Aren't all threads except 0 in danger of using the wrong pp->num_nulls value right below?

Copy link
Contributor Author

@etseidl etseidl Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another only-valid-on-thread-0 result. I originally added more syncthreads before all the other returns, but @vuule pointed out that once this function returns, all that happens is thread 0 takes the return values and copies them to global memory (along with 2 shared mem fields) and then returns. The other threads simply return ignored garbage and exit.

Actually, I should probably move this entire function into gpuComputeStringPageBounds, which would make the above more obvious. It made sense to be a standalone when it was part of the gpuComputePageStringSizes kernel (and back then the syncthreads was necessary), but now that it's its own kernel, there's no need for it.


end_value -= pp->num_nulls;
}
Expand Down Expand Up @@ -848,7 +849,7 @@ __global__ void __launch_bounds__(decode_block_size)
target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0);
if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); }
}
// TODO(ets): see if this sync can be removed
// this needs to be here to prevent warp 1/2 modifying src_pos before all threads have read it
vuule marked this conversation as resolved.
Show resolved Hide resolved
__syncthreads();
if (t < 32) {
// decode repetition and definition levels.
Expand Down