Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip decode steps in Parquet reader when nullable columns have no nulls #15332

Merged
merged 35 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
fe619fd
skip block scans when no nulls are present
etseidl Mar 18, 2024
c0019e8
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Mar 18, 2024
ea6c3d8
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Mar 19, 2024
95d99aa
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Mar 20, 2024
b9601eb
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Mar 20, 2024
5683751
Merge branch 'branch-24.06' into no_nulls
etseidl Mar 21, 2024
b548050
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Mar 22, 2024
b40a5be
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Mar 25, 2024
d12d0f1
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Mar 27, 2024
29d48c3
Merge branch 'branch-24.06' into no_nulls
etseidl Mar 27, 2024
5e080e0
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 1, 2024
799cbd8
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Apr 2, 2024
d6e65a9
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 4, 2024
8bbf94a
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 6, 2024
75de100
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Apr 8, 2024
fe94074
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 9, 2024
a3c214a
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 9, 2024
abfb280
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 10, 2024
858031b
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 12, 2024
96a1c22
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 15, 2024
ba9dfe9
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 16, 2024
e87cd18
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 16, 2024
ab2e3e3
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Apr 17, 2024
e465283
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 18, 2024
9287d8d
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Apr 23, 2024
e0be012
Merge remote-tracking branch 'origin/branch-24.06' into no_nulls
etseidl Apr 24, 2024
ff6825c
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Apr 26, 2024
b26bdf1
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl Apr 29, 2024
cfce61d
Merge branch 'branch-24.06' into no_nulls
etseidl Apr 30, 2024
5490351
Merge branch 'rapidsai:branch-24.06' into no_nulls
etseidl May 2, 2024
29f304a
Merge branch 'branch-24.06' into no_nulls
etseidl May 3, 2024
0577a53
fix changes from merge
etseidl May 3, 2024
b381c31
Merge branch 'branch-24.06' into no_nulls
etseidl May 3, 2024
6775d95
reword comments per review suggestion
etseidl May 3, 2024
dfa59e6
formatting for fix for incorrect comments that were left in place due…
etseidl May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 26 additions & 46 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ constexpr int rolling_buf_size = decode_block_size * 2;
constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size<decode_block_size>();

template <bool nullable, typename level_t, typename state_buf>
static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_value_count,
page_state_s* s,
state_buf* sb,
level_t const* const def,
int t,
bool nullable_with_nulls)
static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(
int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t)
{
constexpr int num_warps = decode_block_size / cudf::detail::warp_size;
constexpr int max_batch_size = num_warps * cudf::detail::warp_size;
Expand All @@ -63,13 +59,9 @@ static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat(int32_t target_v
// definition level. only need to process for nullable columns
int d = 0;
if constexpr (nullable) {
if (nullable_with_nulls) {
d = t < batch_size
? static_cast<int>(def[rolling_index<state_buf::nz_buf_size>(value_count + t)])
: -1;
} else {
d = t < batch_size ? 1 : -1;
}
d = t < batch_size
? static_cast<int>(def[rolling_index<state_buf::nz_buf_size>(value_count + t)])
: -1;
}

int const thread_value_count = t + 1;
Expand Down Expand Up @@ -426,25 +418,21 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down Expand Up @@ -547,26 +535,22 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

// count of valid items in this batch
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down Expand Up @@ -671,25 +655,21 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size)
while (s->error == 0 && processed_count < s->page.num_input_values) {
int next_valid_count;

// only need to process definition levels if this is a nullable column
if (nullable) {
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();
} else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
}
// only need to process definition levels if the column has nulls
if (nullable_with_nulls) {
processed_count += def_decoder.decode_next(t);
__syncthreads();

next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(
processed_count, s, sb, def, t, nullable_with_nulls);
next_valid_count =
gpuUpdateValidityOffsetsAndRowIndicesFlat<true, level_t>(processed_count, s, sb, def, t);
}
// if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip
// this function call entirely since all it will ever generate is a mapping of (i -> i) for
// nz_idx. gpuDecodeValues would be the only work that happens.
else {
processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count);
next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat<false, level_t>(
processed_count, s, sb, nullptr, t, false);
processed_count, s, sb, nullptr, t);
}
__syncthreads();

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1489,8 +1489,10 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
// if we haven't already processed this column because it is part of a struct hierarchy
else if (out_buf.size == 0) {
// add 1 for the offset if this is a list column
out_buf.create(
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(
out_buf.type.id() == type_id::LIST && l_idx < max_depth ? num_rows + 1 : num_rows,
cudf::mask_state::ALL_VALID,
_stream,
_mr);
}
Expand Down Expand Up @@ -1568,7 +1570,8 @@ void reader::impl::allocate_columns(size_t skip_rows, size_t num_rows, bool uses
if (out_buf.type.id() == type_id::LIST && l_idx < max_depth) { size++; }

// allocate
out_buf.create(size, _stream, _mr);
// we're going to start null mask as all valid and then turn bits off if necessary
out_buf.create_with_mask(size, cudf::mask_state::ALL_VALID, _stream, _mr);
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions cpp/src/io/utilities/column_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ void copy_buffer_data(string_policy const& buff, string_policy& new_buff)
} // namespace

template <class string_policy>
void column_buffer_base<string_policy>::create(size_type _size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
void column_buffer_base<string_policy>::create_with_mask(size_type _size,
cudf::mask_state null_mask_state,
vuule marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
size = _size;
_mr = mr;
Expand All @@ -111,11 +112,19 @@ void column_buffer_base<string_policy>::create(size_type _size,
default: _data = create_data(type, size, stream, _mr); break;
}
if (is_nullable) {
_null_mask = cudf::detail::create_null_mask(
size, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), _mr);
_null_mask =
cudf::detail::create_null_mask(size, null_mask_state, rmm::cuda_stream_view(stream), _mr);
}
}

template <class string_policy>
void column_buffer_base<string_policy>::create(size_type _size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
create_with_mask(_size, mask_state::ALL_NULL, stream, mr);
}

template <class string_policy>
string_policy column_buffer_base<string_policy>::empty_like(string_policy const& input)
{
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/io/utilities/column_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ class column_buffer_base {
// preprocessing steps such as in the Parquet reader
void create(size_type _size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr);

// like create(), but also takes a `cudf::mask_state` to allow initializing the null mask as
// something other than `ALL_NULL`
void create_with_mask(size_type _size,
cudf::mask_state null_mask_state,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

// Create a new column_buffer that has empty data but with the same basic information as the
// input column, including same type, nullability, name, and user_data.
static string_policy empty_like(string_policy const& input);
Expand Down
Loading