Skip to content

Commit

Permalink
Skip decode steps in Parquet reader when nullable columns have no nul…
Browse files Browse the repository at this point in the history
…ls (#15332)

Closes #15266.

Some block and warp operations and some atomics can be avoided during Parquet page decoding when nullable columns are known to not contain any null values. One issue, however, is that since the columns are nullable, the null mask has to be set. My approach in this PR was to initialize nullable column buffers with an `ALL_VALID` mask rather than `ALL_NULL`. This will work when nulls are present because `store_validity()` sets the bitmask to all zeros before ORing in the passed `valid_mask`.

Benchmarks modified to not emit nulls showed a good improvement in decoding time for fixed-width data types.
```
## [0] NVIDIA RTX A6000

|  data_type  |    io_type    |  cardinality  |  run_length  |   Ref Time |   Ref Noise |   Cmp Time |   Cmp Noise |         Diff |   %Diff |  Status  |
|-------------|---------------|---------------|--------------|------------|-------------|------------|-------------|--------------|---------|----------|
|  INTEGRAL   | DEVICE_BUFFER |       0       |      1       |   9.955 ms |       2.31% |   9.361 ms |       3.55% |  -594.395 us |  -5.97% |   FAIL   |
|  INTEGRAL   | DEVICE_BUFFER |     1000      |      1       |   9.964 ms |       3.01% |   8.981 ms |       3.98% |  -982.965 us |  -9.87% |   FAIL   |
|  INTEGRAL   | DEVICE_BUFFER |       0       |      32      |  10.222 ms |       3.32% |   9.207 ms |       5.47% | -1014.597 us |  -9.93% |   FAIL   |
|  INTEGRAL   | DEVICE_BUFFER |     1000      |      32      |   9.930 ms |       3.83% |   8.607 ms |       3.37% | -1323.326 us | -13.33% |   FAIL   |
|    FLOAT    | DEVICE_BUFFER |       0       |      1       |   5.999 ms |       3.59% |   5.752 ms |       3.87% |  -246.635 us |  -4.11% |   FAIL   |
|    FLOAT    | DEVICE_BUFFER |     1000      |      1       |   6.576 ms |       4.40% |   5.839 ms |       4.43% |  -737.338 us | -11.21% |   FAIL   |
|    FLOAT    | DEVICE_BUFFER |       0       |      32      |   5.828 ms |       4.59% |   4.940 ms |       4.41% |  -887.375 us | -15.23% |   FAIL   |
|    FLOAT    | DEVICE_BUFFER |     1000      |      32      |   6.198 ms |       3.91% |   5.271 ms |       3.54% |  -927.602 us | -14.97% |   FAIL   |
|   DECIMAL   | DEVICE_BUFFER |       0       |      1       |  20.199 ms |       1.66% |  20.014 ms |       2.23% |  -184.710 us |  -0.91% |   PASS   |
|   DECIMAL   | DEVICE_BUFFER |     1000      |      1       |   7.068 ms |       3.99% |   6.479 ms |       4.08% |  -588.856 us |  -8.33% |   FAIL   |
|   DECIMAL   | DEVICE_BUFFER |       0       |      32      |   9.287 ms |       3.45% |   8.656 ms |       2.94% |  -631.348 us |  -6.80% |   FAIL   |
|   DECIMAL   | DEVICE_BUFFER |     1000      |      32      |   5.641 ms |       4.39% |   5.021 ms |       3.31% |  -620.122 us | -10.99% |   FAIL   |
|  TIMESTAMP  | DEVICE_BUFFER |       0       |      1       |  27.488 ms |       1.57% |  27.235 ms |       1.74% |  -253.277 us |  -0.92% |   PASS   |
|  TIMESTAMP  | DEVICE_BUFFER |     1000      |      1       |   6.656 ms |       4.73% |   6.049 ms |       4.61% |  -607.760 us |  -9.13% |   FAIL   |
|  TIMESTAMP  | DEVICE_BUFFER |       0       |      32      |   9.974 ms |       3.22% |   9.204 ms |       2.84% |  -770.247 us |  -7.72% |   FAIL   |
|  TIMESTAMP  | DEVICE_BUFFER |     1000      |      32      |   5.998 ms |       5.17% |   5.203 ms |       3.06% |  -794.943 us | -13.25% |   FAIL   |
|  DURATION   | DEVICE_BUFFER |       0       |      1       |   8.816 ms |       3.61% |   8.538 ms |       3.26% |  -278.877 us |  -3.16% |   PASS   |
|  DURATION   | DEVICE_BUFFER |     1000      |      1       |   5.989 ms |       4.76% |   5.446 ms |       4.57% |  -542.636 us |  -9.06% |   FAIL   |
|  DURATION   | DEVICE_BUFFER |       0       |      32      |   6.822 ms |       4.96% |   6.042 ms |       3.74% |  -779.786 us | -11.43% |   FAIL   |
|  DURATION   | DEVICE_BUFFER |     1000      |      32      |   5.706 ms |       5.40% |   4.930 ms |       3.39% |  -775.607 us | -13.59% |   FAIL   |
|   STRING    | DEVICE_BUFFER |       0       |      1       |  36.616 ms |       1.74% |  36.483 ms |       1.31% |  -132.191 us |  -0.36% |   PASS   |
|   STRING    | DEVICE_BUFFER |     1000      |      1       |  12.006 ms |       4.15% |  11.989 ms |       3.53% |   -16.278 us |  -0.14% |   PASS   |
|   STRING    | DEVICE_BUFFER |       0       |      32      |  36.587 ms |       1.99% |  36.514 ms |       1.38% |   -73.737 us |  -0.20% |   PASS   |
|   STRING    | DEVICE_BUFFER |     1000      |      32      |  11.235 ms |       4.25% |  11.228 ms |       3.62% |    -7.041 us |  -0.06% |   PASS   |
|    LIST     | DEVICE_BUFFER |       0       |      1       |  36.929 ms |       1.88% |  36.988 ms |       1.42% |    59.350 us |   0.16% |   PASS   |
|    LIST     | DEVICE_BUFFER |     1000      |      1       |  36.510 ms |       1.91% |  36.558 ms |       1.66% |    48.536 us |   0.13% |   PASS   |
|    LIST     | DEVICE_BUFFER |       0       |      32      |  35.513 ms |       2.00% |  35.490 ms |       1.77% |   -23.411 us |  -0.07% |   PASS   |
|    LIST     | DEVICE_BUFFER |     1000      |      32      |  35.755 ms |       1.99% |  35.728 ms |       1.64% |   -27.564 us |  -0.08% |   PASS   |
|   STRUCT    | DEVICE_BUFFER |       0       |      1       |  43.456 ms |       1.35% |  43.537 ms |       1.16% |    81.405 us |   0.19% |   PASS   |
|   STRUCT    | DEVICE_BUFFER |     1000      |      1       |  25.549 ms |       2.54% |  25.698 ms |       1.90% |   149.295 us |   0.58% |   PASS   |
|   STRUCT    | DEVICE_BUFFER |       0       |      32      |  43.103 ms |       1.87% |  43.019 ms |       1.59% |   -84.825 us |  -0.20% |   PASS   |
|   STRUCT    | DEVICE_BUFFER |     1000      |      32      |  23.462 ms |       2.81% |  23.432 ms |       1.88% |   -30.434 us |  -0.13% |   PASS   |
```

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #15332
  • Loading branch information
etseidl authored May 8, 2024
1 parent eaf5556 commit ffbdd24
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 53 deletions.
72 changes: 26 additions & 46 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ constexpr int rolling_buf_size = decode_block_size * 2;
constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size<decode_block_size>();

template <bool nullable, typename level_t, typename state_buf>
static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_value_count,
page_state_s* s,
state_buf* sb,
level_t const* const def,
int t,
bool nullable_with_nulls)
static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(
int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t)
{
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;
constexpr int max_batch_size = num_warps * cudf::detail::warp_size;
Expand All @@ -63,13 +59,9 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_v
// definition level. only need to process for nullable columns
int d = 0;
if constexpr (nullable) {
if (nullable_with_nulls) {
d = t < batch_size
? static_cast<int>(def[rolling_index<state_buf::nz_buf_size>(value_count + t)])
: -1;
} else {
d = t < batch_size ? 1 : -1;
}
d = t < batch_size
? static_cast<int>(def[rolling_index<state_buf::nz_buf_size>(value_count + t)])
: -1;
}

int const thread_value_count = t + 1;
Expand Down Expand Up @@ -426,25 +418,21 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down Expand Up @@ -547,26 +535,22 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

// count of valid items in this batch
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down Expand Up @@ -671,25 +655,21 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1498,8 +1498,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
// if we haven't already processed this column because it is part of a struct hierarchy
else if (out_buf.size == 0) {
// add 1 for the offset if this is a list column
out_buf.create(
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(
out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows,
cudf::mask_state::ALL_VALID,
_stream,
_mr);
}
Expand Down Expand Up @@ -1577,7 +1579,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, _stream, _mr);
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr);
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ void copy_buffer_data(string_policy const& buff, string_policy& new_buff)
} // namespace

template <class string_policy>
void column_buffer_base<string_policy>::create(size_type _size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
void column_buffer_base<string_policy>::create_with_mask(size_type _size,
cudf::mask_state null_mask_state,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
size = _size;
_mr = mr;
Expand All @@ -111,11 +112,19 @@ void column_buffer_base<string_policy>::create(size_type _size,
default: _data = create_data(type, size, stream, _mr); break;
}
if (is_nullable) {
_null_mask = cudf::detail::create_null_mask(
size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), _mr);
_null_mask =
cudf::detail::create_null_mask(size, null_mask_state, rmm::cuda_stream_view(stream), _mr);
}
}

template <class string_policy>
void column_buffer_base<string_policy>::create(size_type _size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
create_with_mask(_size, mask_state::ALL_NULL, stream, mr);
}

template <class string_policy>
string_policy column_buffer_base<string_policy>::empty_like(string_policy const& input)
{
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ class column_buffer_base {
// preprocessing steps such as in the Parquet reader
void create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

// like create(), but also takes a `cudf::mask_state` to allow initializing the null mask as
// something other than `ALL_NULL`
void create_with_mask(size_type _size,
cudf::mask_state null_mask_state,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

// Create a new column_buffer that has empty data but with the same basic information as the
// input column, including same type, nullability, name, and user_data.
static string_policy empty_like(string_policy const& input);
Expand Down

0 comments on commit ffbdd24

Please sign in to comment.