From ffbdd2402d1131b9a06cda87b4ef888953b2901a Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 8 May 2024 12:52:16 -0700 Subject: [PATCH] Skip decode steps in Parquet reader when nullable columns have no nulls (#15332) Closes #15266. Some block and warp operations and some atomics can be avoided during Parquet page decoding when nullable columns are known to not contain any null values. One issue, however, is that since the columns are nullable, the null mask has to be set. My approach in this PR was to initialize nullable column buffers with an `ALL_VALID` mask rather than `ALL_NULL`. This will work when nulls are present because `store_validity()` sets the bitmask to all zeros before ORing in the passed `valid_mask`. Benchmarks modified to not emit nulls showed a good improvement in decoding time for fixed-width data types. ``` ## [0] NVIDIA RTX A6000 | data_type | io_type | cardinality | run_length | Ref Time | Ref Noise | Cmp Time | Cmp Noise | Diff | %Diff | Status | |-------------|---------------|---------------|--------------|------------|-------------|------------|-------------|--------------|---------|----------| | INTEGRAL | DEVICE_BUFFER | 0 | 1 | 9.955 ms | 2.31% | 9.361 ms | 3.55% | -594.395 us | -5.97% | FAIL | | INTEGRAL | DEVICE_BUFFER | 1000 | 1 | 9.964 ms | 3.01% | 8.981 ms | 3.98% | -982.965 us | -9.87% | FAIL | | INTEGRAL | DEVICE_BUFFER | 0 | 32 | 10.222 ms | 3.32% | 9.207 ms | 5.47% | -1014.597 us | -9.93% | FAIL | | INTEGRAL | DEVICE_BUFFER | 1000 | 32 | 9.930 ms | 3.83% | 8.607 ms | 3.37% | -1323.326 us | -13.33% | FAIL | | FLOAT | DEVICE_BUFFER | 0 | 1 | 5.999 ms | 3.59% | 5.752 ms | 3.87% | -246.635 us | -4.11% | FAIL | | FLOAT | DEVICE_BUFFER | 1000 | 1 | 6.576 ms | 4.40% | 5.839 ms | 4.43% | -737.338 us | -11.21% | FAIL | | FLOAT | DEVICE_BUFFER | 0 | 32 | 5.828 ms | 4.59% | 4.940 ms | 4.41% | -887.375 us | -15.23% | FAIL | | FLOAT | DEVICE_BUFFER | 1000 | 32 | 6.198 ms | 3.91% | 5.271 ms | 3.54% | -927.602 us | -14.97% | FAIL | | DECIMAL | DEVICE_BUFFER | 0 | 1 | 20.199 ms | 1.66% | 20.014 ms | 2.23% | -184.710 us | -0.91% | PASS | | DECIMAL | DEVICE_BUFFER | 1000 | 1 | 7.068 ms | 3.99% | 6.479 ms | 4.08% | -588.856 us | -8.33% | FAIL | | DECIMAL | DEVICE_BUFFER | 0 | 32 | 9.287 ms | 3.45% | 8.656 ms | 2.94% | -631.348 us | -6.80% | FAIL | | DECIMAL | DEVICE_BUFFER | 1000 | 32 | 5.641 ms | 4.39% | 5.021 ms | 3.31% | -620.122 us | -10.99% | FAIL | | TIMESTAMP | DEVICE_BUFFER | 0 | 1 | 27.488 ms | 1.57% | 27.235 ms | 1.74% | -253.277 us | -0.92% | PASS | | TIMESTAMP | DEVICE_BUFFER | 1000 | 1 | 6.656 ms | 4.73% | 6.049 ms | 4.61% | -607.760 us | -9.13% | FAIL | | TIMESTAMP | DEVICE_BUFFER | 0 | 32 | 9.974 ms | 3.22% | 9.204 ms | 2.84% | -770.247 us | -7.72% | FAIL | | TIMESTAMP | DEVICE_BUFFER | 1000 | 32 | 5.998 ms | 5.17% | 5.203 ms | 3.06% | -794.943 us | -13.25% | FAIL | | DURATION | DEVICE_BUFFER | 0 | 1 | 8.816 ms | 3.61% | 8.538 ms | 3.26% | -278.877 us | -3.16% | PASS | | DURATION | DEVICE_BUFFER | 1000 | 1 | 5.989 ms | 4.76% | 5.446 ms | 4.57% | -542.636 us | -9.06% | FAIL | | DURATION | DEVICE_BUFFER | 0 | 32 | 6.822 ms | 4.96% | 6.042 ms | 3.74% | -779.786 us | -11.43% | FAIL | | DURATION | DEVICE_BUFFER | 1000 | 32 | 5.706 ms | 5.40% | 4.930 ms | 3.39% | -775.607 us | -13.59% | FAIL | | STRING | DEVICE_BUFFER | 0 | 1 | 36.616 ms | 1.74% | 36.483 ms | 1.31% | -132.191 us | -0.36% | PASS | | STRING | DEVICE_BUFFER | 1000 | 1 | 12.006 ms | 4.15% | 11.989 ms | 3.53% | -16.278 us | -0.14% | PASS | | STRING | DEVICE_BUFFER | 0 | 32 | 36.587 ms | 1.99% | 36.514 ms | 1.38% | -73.737 us | -0.20% | PASS | | STRING | DEVICE_BUFFER | 1000 | 32 | 11.235 ms | 4.25% | 11.228 ms | 3.62% | -7.041 us | -0.06% | PASS | | LIST | DEVICE_BUFFER | 0 | 1 | 36.929 ms | 1.88% | 36.988 ms | 1.42% | 59.350 us | 0.16% | PASS | | LIST | DEVICE_BUFFER | 1000 | 1 | 36.510 ms | 1.91% | 36.558 ms | 1.66% | 48.536 us | 0.13% | PASS | | LIST | DEVICE_BUFFER | 0 | 32 | 35.513 ms | 2.00% | 35.490 ms | 1.77% | -23.411 us | -0.07% | PASS | | LIST | DEVICE_BUFFER | 1000 | 32 | 35.755 ms | 1.99% | 35.728 ms | 1.64% | -27.564 us | -0.08% | PASS | | STRUCT | DEVICE_BUFFER | 0 | 1 | 43.456 ms | 1.35% | 43.537 ms | 1.16% | 81.405 us | 0.19% | PASS | | STRUCT | DEVICE_BUFFER | 1000 | 1 | 25.549 ms | 2.54% | 25.698 ms | 1.90% | 149.295 us | 0.58% | PASS | | STRUCT | DEVICE_BUFFER | 0 | 32 | 43.103 ms | 1.87% | 43.019 ms | 1.59% | -84.825 us | -0.20% | PASS | | STRUCT | DEVICE_BUFFER | 1000 | 32 | 23.462 ms | 2.81% | 23.432 ms | 1.88% | -30.434 us | -0.13% | PASS | ``` Authors: - Ed Seidl (https://github.com/etseidl) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/15332 --- cpp/src/io/parquet/decode_fixed.cu | 72 +++++++------------- cpp/src/io/parquet/reader_impl_preprocess.cu | 7 +- cpp/src/io/utilities/column_buffer.cpp | 19 ++++-- cpp/src/io/utilities/column_buffer.hpp | 7 ++ 4 files changed, 52 insertions(+), 53 deletions(-) diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index f3332a23992..bfd89200786 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -31,12 +31,8 @@ constexpr int rolling_buf_size = decode_block_size * 2; constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); template -static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_value_count, - page_state_s* s, - state_buf* sb, - level_t const* const def, - int t, - bool nullable_with_nulls) +static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { constexpr int num_warps = decode_block_size / cudf::detail::warp_size; constexpr int max_batch_size = num_warps * cudf::detail::warp_size; @@ -63,13 +59,9 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_v // definition level. only need to process for nullable columns int d = 0; if constexpr (nullable) { - if (nullable_with_nulls) { - d = t < batch_size - ? static_cast(def[rolling_index(value_count + t)]) - : -1; - } else { - d = t < batch_size ? 1 : -1; - } + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; } int const thread_value_count = t + 1; @@ -426,17 +418,13 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -444,7 +432,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); @@ -547,18 +535,14 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); // count of valid items in this batch - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -566,7 +550,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); @@ -671,17 +655,13 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if this is a nullable column - if (nullable) { - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); - } else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - } + // only need to process definition levels if the column has nulls + if (nullable_with_nulls) { + processed_count += def_decoder.decode_next(t); + __syncthreads(); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, def, t, nullable_with_nulls); + next_valid_count = + gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for @@ -689,7 +669,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t, false); + processed_count, s, sb, nullptr, t); } __syncthreads(); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 55633b97cf4..a5cd7d06536 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1498,8 +1498,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses // if we haven't already processed this column because it is part of a struct hierarchy else if (out_buf.size == 0) { // add 1 for the offset if this is a list column - out_buf.create( + // we're going to start null mask as all valid and then turn bits off if necessary + out_buf.create_with_mask( out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows, + cudf::mask_state::ALL_VALID, _stream, _mr); } @@ -1577,7 +1579,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; } // allocate - out_buf.create(size, _stream, _mr); + // we're going to start null mask as all valid and then turn bits off if necessary + out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr); } } } diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5ef43599838..e5d4e1a360f 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -91,9 +91,10 @@ void copy_buffer_data(string_policy const& buff, string_policy& new_buff) } // namespace template -void column_buffer_base::create(size_type _size, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +void column_buffer_base::create_with_mask(size_type _size, + cudf::mask_state null_mask_state, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { size = _size; _mr = mr; @@ -111,11 +112,19 @@ void column_buffer_base::create(size_type _size, default: _data = create_data(type, size, stream, _mr); break; } if (is_nullable) { - _null_mask = cudf::detail::create_null_mask( - size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), _mr); + _null_mask = + cudf::detail::create_null_mask(size, null_mask_state, rmm::cuda_stream_view(stream), _mr); } } +template +void column_buffer_base::create(size_type _size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + create_with_mask(_size, mask_state::ALL_NULL, stream, mr); +} + template string_policy column_buffer_base::empty_like(string_policy const& input) { diff --git a/cpp/src/io/utilities/column_buffer.hpp b/cpp/src/io/utilities/column_buffer.hpp index ace1396bc09..e6bfae0681a 100644 --- a/cpp/src/io/utilities/column_buffer.hpp +++ b/cpp/src/io/utilities/column_buffer.hpp @@ -115,6 +115,13 @@ class column_buffer_base { // preprocessing steps such as in the Parquet reader void create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); + // like create(), but also takes a `cudf::mask_state` to allow initializing the null mask as + // something other than `ALL_NULL` + void create_with_mask(size_type _size, + cudf::mask_state null_mask_state, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + // Create a new column_buffer that has empty data but with the same basic information as the // input column, including same type, nullability, name, and user_data. static string_policy empty_like(string_policy const& input);