Skip to content

Commit

Permalink
Mukernels strings (#17286)
Browse files Browse the repository at this point in the history
Moves parquet string decoding from its stand-alone kernel to using the templated generic kernel.  To optimize performance, the scheme for copying values to the output has changed.  The details of this scheme are in the gpuDecodeString(), but briefly: 

The block size is 128 threads. We try to have the threads in the block share the copying work such that, each thread copies (up to) 4 bytes per memcpy (showed the best performance). So, for a given batch of strings, the longer the average string size is, the more threads that work together to copy it. We cap this at 32 threads per string (a whole warp) for strings longer than 64 bytes (if length 65, 16 threads would require copying 5 chars each).  

For short strings we use a minimum of 4 threads per string: this results in at most 32 simultaneous string copies.  We can't go more than 32 simultaneous copies because performance decreases.  This is presumably because on a cache hit, the cache line size is 128 bytes, and with so many threads running across the blocks we run out of room in the cache. 

### Benchmark Results (Gaussian-distributed string lengths):
* NO dictionary, length from 0 - 32: No difference
* NO dictionary, larger lengths (32 - 64, 16 - 80, 64 - 128, etc.): 10% - 20% faster. 
* Dictionary, cardinality 0: 0% - 15% faster. 
* Dictionary, cardinality 1000, length from 0 - 32: 30% - 35% faster. 
* Dictionary, cardinality 1000, larger lengths (32 - 64, 16 - 80, 64 - 128, etc.): 50% - 60% faster. 
* Selected customer data: 5% faster. 

These performance improvements also hold for [this previous long-string performance issue](#15297). The primary source of these improvements is having all 128 threads in the block helping to copy the string, whereas before we were only using one warp to do the copy (due to the caching issues). The performance of the non-dictionary and zero-cardinality results are limited because we are bound by the time needed to copy the string data from global memory.  For cardinality 1000 dictionary data, the requested strings are often still in the cache and the full benefit of the better thread utilization can be realized.

Authors:
  - Paul Mattione (https://github.com/pmattione-nvidia)

Approvers:
  - https://github.com/nvdbaranec
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17286
  • Loading branch information
pmattione-nvidia authored Jan 15, 2025
1 parent e272f1e commit 834565a
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 376 deletions.
48 changes: 47 additions & 1 deletion cpp/benchmarks/io/parquet/parquet_reader_input.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,6 +94,43 @@ void BM_parquet_read_data(nvbench::state& state,
state, data_profile_builder().cardinality(cardinality).avg_run_length(run_length), type_list);
}

void BM_parquet_read_long_strings(nvbench::state& state)
{
auto const cardinality = static_cast<cudf::size_type>(state.get_int64("cardinality"));
auto const run_length = static_cast<cudf::size_type>(state.get_int64("run_length"));

auto const d_type = get_type_or_group(static_cast<int32_t>(data_type::STRING));
auto const source_type = retrieve_io_type_enum(state.get_string("io_type"));
auto const compression = cudf::io::compression_type::SNAPPY;
cuio_source_sink_pair source_sink(source_type);

auto const avg_string_length = static_cast<cudf::size_type>(state.get_int64("avg_string_length"));
// corresponds to 3 sigma (full width 6 sigma: 99.7% of range)
auto const half_width = static_cast<cudf::size_type>(state.get_int64("half_width_string_length"));
auto const length_min = avg_string_length - half_width;
auto const length_max = avg_string_length + half_width;

data_profile profile =
data_profile_builder()
.cardinality(cardinality)
.avg_run_length(run_length)
.distribution(data_type::STRING, distribution_id::NORMAL, length_min, length_max);

auto const num_rows_written = [&]() {
auto const tbl = create_random_table(
cycle_dtypes(d_type, num_cols), table_size_bytes{data_size}, profile); // THIS
auto const view = tbl->view();

cudf::io::parquet_writer_options write_opts =
cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view)
.compression(compression);
cudf::io::write_parquet(write_opts);
return view.num_rows();
}();

parquet_read_common(num_rows_written, num_cols, source_sink, state);
}

template <data_type DataType>
void BM_parquet_read_fixed_width_struct(nvbench::state& state,
nvbench::type_list<nvbench::enum_type<DataType>> type_list)
Expand Down Expand Up @@ -366,3 +403,12 @@ NVBENCH_BENCH_TYPES(BM_parquet_read_fixed_width_struct, NVBENCH_TYPE_AXES(d_type
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32});

NVBENCH_BENCH(BM_parquet_read_long_strings)
.set_name("parquet_read_long_strings")
.add_string_axis("io_type", {"DEVICE_BUFFER"})
.set_min_samples(4)
.add_int64_axis("cardinality", {0, 1000})
.add_int64_axis("run_length", {1, 32})
.add_int64_axis("avg_string_length", {16, 48, 96})
.add_int64_axis("half_width_string_length", {16, 32, 64}); // length = avg +/- half_width
189 changes: 136 additions & 53 deletions cpp/src/io/parquet/decode_fixed.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
#include "page_data.cuh"
#include "page_decode.cuh"
#include "page_string_utils.cuh"
#include "parquet_gpu.hpp"
#include "rle_stream.cuh"

#include <cudf/detail/utilities/cuda.cuh>

#include <cooperative_groups.h>

namespace cudf::io::parquet::detail {

namespace {
Expand Down Expand Up @@ -173,14 +176,6 @@ __device__ void gpuDecodeFixedWidthValues(
}
}

template <int block_size, bool has_lists_t, typename state_buf>
struct decode_fixed_width_values_func {
__device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t)
{
gpuDecodeFixedWidthValues<block_size, has_lists_t, state_buf>(s, sb, start, end, t);
}
};

template <int block_size, bool has_lists_t, typename state_buf>
__device__ inline void gpuDecodeFixedWidthSplitValues(
page_state_s* s, state_buf* const sb, int start, int end, int t)
Expand Down Expand Up @@ -285,14 +280,6 @@ __device__ inline void gpuDecodeFixedWidthSplitValues(
}
}

template <int block_size, bool has_lists_t, typename state_buf>
struct decode_fixed_width_split_values_func {
__device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t)
{
gpuDecodeFixedWidthSplitValues<block_size, has_lists_t, state_buf>(s, sb, start, end, t);
}
};

template <int decode_block_size, typename level_t, typename state_buf>
static __device__ int gpuUpdateValidityAndRowIndicesNested(
int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t)
Expand Down Expand Up @@ -403,6 +390,7 @@ static __device__ int gpuUpdateValidityAndRowIndicesNested(
if (t == 0) {
// update valid value count for decoding and total # of values we've processed
max_depth_ni.valid_count = max_depth_valid_count;
max_depth_ni.value_count = value_count; // Needed AT LEAST for strings!
s->nz_count = max_depth_valid_count;
s->input_value_count = value_count;
s->input_row_count = value_count;
Expand Down Expand Up @@ -888,6 +876,60 @@ __device__ int skip_decode(stream_type& parquet_stream, int num_to_skip, int t)
return num_skipped;
}

template <decode_kernel_mask kernel_mask_t>
constexpr bool has_dict()
{
return (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::STRING_DICT) ||
(kernel_mask_t == decode_kernel_mask::STRING_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::STRING_DICT_LIST);
}

template <decode_kernel_mask kernel_mask_t>
constexpr bool has_bools()
{
return (kernel_mask_t == decode_kernel_mask::BOOLEAN) ||
(kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST);
}

template <decode_kernel_mask kernel_mask_t>
constexpr bool has_nesting()
{
return (kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) ||
(kernel_mask_t == decode_kernel_mask::STRING_NESTED) ||
(kernel_mask_t == decode_kernel_mask::STRING_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::STRING_STREAM_SPLIT_NESTED);
}

template <decode_kernel_mask kernel_mask_t>
constexpr bool has_lists()
{
return (kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST) ||
(kernel_mask_t == decode_kernel_mask::STRING_LIST) ||
(kernel_mask_t == decode_kernel_mask::STRING_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::STRING_STREAM_SPLIT_LIST);
}

template <decode_kernel_mask kernel_mask_t>
constexpr bool is_split_decode()
{
return (kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST) ||
(kernel_mask_t == decode_kernel_mask::STRING_STREAM_SPLIT) ||
(kernel_mask_t == decode_kernel_mask::STRING_STREAM_SPLIT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::STRING_STREAM_SPLIT_LIST);
}

/**
* @brief Kernel for computing fixed width non dictionary column data stored in the pages
*
Expand All @@ -910,34 +952,22 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
size_t num_rows,
kernel_error::pointer error_code)
{
constexpr bool has_dict_t = (kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST);
constexpr bool has_bools_t = (kernel_mask_t == decode_kernel_mask::BOOLEAN) ||
(kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST);
constexpr bool has_nesting_t =
(kernel_mask_t == decode_kernel_mask::BOOLEAN_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED);
constexpr bool has_lists_t =
(kernel_mask_t == decode_kernel_mask::BOOLEAN_LIST) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::FIXED_WIDTH_NO_DICT_LIST) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST);
constexpr bool split_decode_t =
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) ||
(kernel_mask_t == decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_LIST);
constexpr bool has_dict_t = has_dict<kernel_mask_t>();
constexpr bool has_bools_t = has_bools<kernel_mask_t>();
constexpr bool has_nesting_t = has_nesting<kernel_mask_t>();
constexpr bool has_lists_t = has_lists<kernel_mask_t>();
constexpr bool split_decode_t = is_split_decode<kernel_mask_t>();
constexpr bool has_strings_t =
(static_cast<uint32_t>(kernel_mask_t) & STRINGS_MASK_NON_DELTA) != 0;

constexpr int rolling_buf_size = decode_block_size_t * 2;
constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size<decode_block_size_t>();

__shared__ __align__(16) page_state_s state_g;
using state_buf_t = page_state_buffers_s<rolling_buf_size, // size of nz_idx buffer
has_dict_t || has_bools_t ? rolling_buf_size : 1,
1>;
constexpr bool use_dict_buffers = has_dict_t || has_bools_t || has_strings_t;
using state_buf_t = page_state_buffers_s<rolling_buf_size, // size of nz_idx buffer
use_dict_buffers ? rolling_buf_size : 1,
has_strings_t ? rolling_buf_size : 1>;
__shared__ __align__(16) state_buf_t state_buffers;

page_state_s* const s = &state_g;
Expand All @@ -961,12 +991,6 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
return;
}

using value_decoder_type = std::conditional_t<
split_decode_t,
decode_fixed_width_split_values_func<decode_block_size_t, has_lists_t, state_buf_t>,
decode_fixed_width_values_func<decode_block_size_t, has_lists_t, state_buf_t>>;
value_decoder_type decode_values;

bool const should_process_nulls = is_nullable(s) && maybe_has_nulls(s);

// shared buffer. all shared memory is suballocated out of here
Expand All @@ -979,12 +1003,16 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)

// setup all shared memory buffers
int shared_offset = 0;
auto rep_runs = reinterpret_cast<rle_run*>(shared_buf + shared_offset);

auto rep_runs = reinterpret_cast<rle_run*>(shared_buf + shared_offset);
if constexpr (has_lists_t) { shared_offset += rle_run_buffer_bytes; }

auto dict_runs = reinterpret_cast<rle_run*>(shared_buf + shared_offset);
if constexpr (has_dict_t) { shared_offset += rle_run_buffer_bytes; }

auto bool_runs = reinterpret_cast<rle_run*>(shared_buf + shared_offset);
if constexpr (has_bools_t) { shared_offset += rle_run_buffer_bytes; }

auto def_runs = reinterpret_cast<rle_run*>(shared_buf + shared_offset);

// initialize the stream decoders (requires values computed in setupLocalPageInfo)
Expand Down Expand Up @@ -1031,19 +1059,25 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
// - valid_count: number of non-null values we have decoded so far. In each iteration of the
// loop below, we look at the number of valid items (which could be all for non-nullable),
// and valid_count is that running count.
int processed_count = 0;
int valid_count = 0;
int processed_count = 0;
int valid_count = 0;
size_t string_output_offset = 0;

// Skip ahead in the decoding so that we don't repeat work (skipped_leaf_values = 0 for non-lists)
auto const skipped_leaf_values = s->page.skipped_leaf_values;
if constexpr (has_lists_t) {
auto const skipped_leaf_values = s->page.skipped_leaf_values;
if (skipped_leaf_values > 0) {
if (should_process_nulls) {
skip_decode<rolling_buf_size>(def_decoder, skipped_leaf_values, t);
}
processed_count = skip_decode<rolling_buf_size>(rep_decoder, skipped_leaf_values, t);
if constexpr (has_dict_t) {
skip_decode<rolling_buf_size>(dict_stream, skipped_leaf_values, t);
} else if constexpr (has_strings_t) {
gpuInitStringDescriptors<true>(
s, sb, skipped_leaf_values, cooperative_groups::this_thread_block());
if (t == 0) { s->dict_pos = processed_count; }
__syncthreads();
}
}
}
Expand Down Expand Up @@ -1091,13 +1125,17 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
}
__syncthreads();

// if we have dictionary or bool data
// We want to limit the number of dictionary/bool items we decode, that correspond to
// the rows we have processed in this iteration that are valid.
// We want to limit the number of dictionary/bool/string items we decode,
// that correspond to the rows we have processed in this iteration that are valid.
// We know the number of valid rows to process with: next_valid_count - valid_count.
if constexpr (has_dict_t) {
dict_stream.decode_next(t, next_valid_count - valid_count);
__syncthreads();
} else if constexpr (has_strings_t) {
auto const target_pos = next_valid_count + skipped_leaf_values;
gpuInitStringDescriptors<false>(s, sb, target_pos, cooperative_groups::this_thread_block());
if (t == 0) { s->dict_pos = target_pos; }
__syncthreads();
} else if constexpr (has_bools_t) {
if (bools_are_rle_stream) {
bool_stream.decode_next(t, next_valid_count - valid_count);
Expand All @@ -1108,11 +1146,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size_t, 8)
}

// decode the values themselves
decode_values(s, sb, valid_count, next_valid_count, t);
if constexpr (has_strings_t) {
string_output_offset = gpuDecodeString<decode_block_size_t, has_lists_t, split_decode_t>(
s, sb, valid_count, next_valid_count, t, string_output_offset);
} else if constexpr (split_decode_t) {
gpuDecodeFixedWidthSplitValues<decode_block_size_t, has_lists_t>(
s, sb, valid_count, next_valid_count, t);
} else {
gpuDecodeFixedWidthValues<decode_block_size_t, has_lists_t>(
s, sb, valid_count, next_valid_count, t);
}
__syncthreads();

valid_count = next_valid_count;
}

// Now turn the array of lengths into offsets, but skip if this is a large string column. In the
// latter case, offsets will be computed during string column creation.
if constexpr (has_strings_t) {
if (!s->col.is_large_string_col) {
convert_small_string_lengths_to_offsets<decode_block_size_t, has_lists_t>(s);
}
}
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

Expand Down Expand Up @@ -1193,6 +1248,34 @@ void __host__ DecodePageData(cudf::detail::hostdevice_span<PageInfo> pages,
case decode_kernel_mask::BOOLEAN_LIST:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::BOOLEAN_LIST>{});
break;
case decode_kernel_mask::STRING:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING>{});
break;
case decode_kernel_mask::STRING_NESTED:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_NESTED>{});
break;
case decode_kernel_mask::STRING_LIST:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_LIST>{});
break;
case decode_kernel_mask::STRING_DICT:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_DICT>{});
break;
case decode_kernel_mask::STRING_DICT_NESTED:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_DICT_NESTED>{});
break;
case decode_kernel_mask::STRING_DICT_LIST:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_DICT_LIST>{});
break;
case decode_kernel_mask::STRING_STREAM_SPLIT:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_STREAM_SPLIT>{});
break;
case decode_kernel_mask::STRING_STREAM_SPLIT_NESTED:
launch_kernel(int_tag_t<128>{},
kernel_tag_t<decode_kernel_mask::STRING_STREAM_SPLIT_NESTED>{});
break;
case decode_kernel_mask::STRING_STREAM_SPLIT_LIST:
launch_kernel(int_tag_t<128>{}, kernel_tag_t<decode_kernel_mask::STRING_STREAM_SPLIT_LIST>{});
break;
default: CUDF_EXPECTS(false, "Kernel type not handled by this function"); break;
}
}
Expand Down
Loading

0 comments on commit 834565a

Please sign in to comment.