Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DELTA_LENGTH_BYTE_ARRAY encoder and decoder for Parquet #14590

Merged
merged 37 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a95259f
add delta length byte array encoder/decoder
etseidl Dec 6, 2023
27dfcdd
change encoding in file
etseidl Dec 6, 2023
73573f3
rename some things
etseidl Dec 6, 2023
b454b76
a few cleanups
etseidl Dec 6, 2023
4821128
Merge remote-tracking branch 'origin/branch-24.02' into dlba_enc_dec
etseidl Dec 6, 2023
fb50fcb
finish merge of size statistics
etseidl Dec 6, 2023
394fe81
Merge branch 'rapidsai:branch-24.02' into dlba_enc_dec
etseidl Dec 8, 2023
a2181bc
Merge remote-tracking branch 'origin/branch-24.02' into dlba_enc_dec
etseidl Dec 9, 2023
07af187
add syncthreads
etseidl Dec 9, 2023
fae7388
Merge branch 'rapidsai:branch-24.02' into dlba_enc_dec
etseidl Dec 12, 2023
aa94791
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 12, 2023
f806a48
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 12, 2023
080e89d
change unsupported encoding to 15
etseidl Dec 12, 2023
0827fd3
Merge branch 'dlba_enc_dec' of github.com:etseidl/cudf into dlba_enc_dec
etseidl Dec 12, 2023
7e9d752
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 13, 2023
8b0f277
pass random number genertor into data gen functions
etseidl Dec 14, 2023
4bbbf63
implement suggestion from review
etseidl Dec 14, 2023
901644d
implement suggestion from review
etseidl Dec 15, 2023
0934708
remove __host__ decorator
etseidl Dec 15, 2023
ad42470
change skip_values_and_sum to run on a single warp
etseidl Dec 15, 2023
c552ffb
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 15, 2023
ace5be3
implement suggestion from review
etseidl Dec 15, 2023
3523a36
move delta char len calculation
etseidl Dec 15, 2023
c028f2a
a few cleanups
etseidl Dec 16, 2023
4c2dc56
remove some outdated TODOs and superfluous threadfences
etseidl Dec 16, 2023
c07bea2
handle non-string byte arrays
etseidl Dec 16, 2023
62f8b4a
parquet-mr does not like duplicate column names
etseidl Dec 16, 2023
36478bf
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 18, 2023
b4e6999
fix for writing all-null column
etseidl Dec 19, 2023
385dce1
fix for reading single null row
etseidl Dec 19, 2023
edd3c13
Merge remote-tracking branch 'origin/branch-24.02' into dlba_enc_dec
etseidl Dec 19, 2023
6a25a6a
finish merge
etseidl Dec 19, 2023
0d0c95f
make sure header is written if all values are null
etseidl Dec 19, 2023
e0d8cf1
add extra delta tests
etseidl Dec 20, 2023
a65f512
change param name to match use
etseidl Dec 20, 2023
b77b9ee
Merge branch 'branch-24.02' into dlba_enc_dec
etseidl Dec 20, 2023
fc282fe
try 2 at consistent naming
etseidl Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct delta_binary_decoder {
uint8_t const* cur_mb_start; // pointer to the start of the current mini-block data
uint8_t const* cur_bitwidths; // pointer to the bitwidth array in the block

uleb128_t value[delta_rolling_buf_size]; // circular buffer of delta values
zigzag128_t value[delta_rolling_buf_size]; // circular buffer of delta values
Copy link
Contributor

@ttnghia ttnghia Dec 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be a breaking change? It is changing from unsigned to signed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be user visible...results are cast elsewhere anyway


// returns the value stored in the `value` array at index
// `rolling_index<delta_rolling_buf_size>(idx)`. If `idx` is `0`, then return `first_value`.
Expand Down Expand Up @@ -299,6 +299,49 @@ struct delta_binary_decoder {
}
}

// Decodes and skips values until the block containing the value after `skip` is reached.
// Keeps a running sum of the values and returns that upon exit. Called by all threads in a
// warp 0. Result is only valid on thread 0.
// This is intended for use only by the DELTA_LENGTH_BYTE_ARRAY decoder.
inline __device__ size_t skip_values_and_sum(int skip)
vuule marked this conversation as resolved.
Show resolved Hide resolved
{
using cudf::detail::warp_size;
// DELTA_LENGTH_BYTE_ARRAY lengths are encoded as INT32 by convention (since the PLAIN encoding
// uses 4-byte lengths).
using delta_length_type = int32_t;
using warp_reduce = cub::WarpReduce<size_t>;
__shared__ warp_reduce::TempStorage temp_storage;
int const t = threadIdx.x;

// initialize sum with first value, which is stored in the block header. cast to
// `delta_length_type` to ensure the value is interpreted properly before promoting it
// back to `size_t`.
size_t sum = static_cast<delta_length_type>(value_at(0));

// if only skipping one value, we're done already
if (skip == 1) { return sum; }

// need to do in multiple passes if values_per_mb != 32
uint32_t const num_pass = values_per_mb / warp_size;

while (current_value_idx < skip && current_value_idx < num_encoded_values(true)) {
calc_mini_block_values(t);

int const idx = current_value_idx + t;

for (uint32_t p = 0; p < num_pass; p++) {
auto const pidx = idx + p * warp_size;
size_t const val = pidx < skip ? static_cast<delta_length_type>(value_at(pidx)) : 0;
auto const warp_sum = warp_reduce(temp_storage).Sum(val);
if (t == 0) { sum += warp_sum; }
}
if (t == 0) { setup_next_mini_block(true); }
__syncwarp();
}

return sum;
}

// decodes the current mini block and stores the values obtained. should only be called by
// a single warp.
inline __device__ void decode_batch()
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class delta_binary_packer {
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
_buffer[0] = 0;
}

// Each thread calls this to add its current value.
Expand Down Expand Up @@ -215,7 +216,7 @@ class delta_binary_packer {
}

// Called by each thread to flush data to the sink.
inline __device__ uint8_t const* flush()
inline __device__ uint8_t* flush()
{
using cudf::detail::warp_size;
__shared__ T block_min;
Expand All @@ -224,6 +225,10 @@ class delta_binary_packer {
int const warp_id = t / warp_size;
int const lane_id = t % warp_size;

// if no values have been written, still need to write the header
if (t == 0 && _current_idx == 0) { write_header(); }

// if there are no values to write, just return
if (_values_in_buffer <= 0) { return _dst; }

// Calculate delta for this thread.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
// nothing to do, just don't error
break;
Expand Down
214 changes: 199 additions & 15 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "page_string_utils.cuh"
#include "parquet_gpu.hpp"

#include <io/utilities/block_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>

#include <rmm/exec_policy.hpp>
Expand Down Expand Up @@ -463,7 +465,7 @@ __global__ void __launch_bounds__(decode_block_size)
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// choose a character parallel string copy when the average string is longer than a warp
auto const use_char_ll = (s->page.str_bytes / s->page.num_valids) > cudf::detail::warp_size;
auto const use_char_ll = (s->page.str_bytes / s->page.num_valids) > warp_size;

// copying logic from gpuDecodePageData.
PageNestingDecodeInfo const* nesting_info_base = s->nesting_info;
Expand Down Expand Up @@ -493,6 +495,7 @@ __global__ void __launch_bounds__(decode_block_size)
int const leaf_level_index = s->col.max_nesting_depth - 1;
auto strings_data = nesting_info_base[leaf_level_index].string_out;

// sanity check to make sure we can process this page
auto const batch_size = prefix_db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<kernel_error::value_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
Expand Down Expand Up @@ -581,18 +584,174 @@ __global__ void __launch_bounds__(decode_block_size)
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

// Decode page data that is DELTA_LENGTH_BYTE_ARRAY packed. This encoding consists of a
// DELTA_BINARY_PACKED array of string lengths, followed by the string data.
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeDeltaLengthByteArray(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16) page_state_buffers_s<delta_rolling_buf_size, 0, 0> state_buffers;
__shared__ __align__(8) uint8_t const* page_string_data;
__shared__ size_t string_offset;

page_state_s* const s = &state_g;
auto* const sb = &state_buffers;
int const page_idx = blockIdx.x;
int const t = threadIdx.x;
int const lane_id = t % warp_size;
auto* const db = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_LENGTH_BA;
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{mask},
page_processing_stage::DECODE)) {
return;
}

bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// copying logic from gpuDecodePageData.
PageNestingDecodeInfo const* nesting_info_base = s->nesting_info;

__shared__ level_t rep[delta_rolling_buf_size]; // circular buffer of repetition level values
__shared__ level_t def[delta_rolling_buf_size]; // circular buffer of definition level values

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t const skipped_leaf_values = s->page.skipped_leaf_values;

// initialize delta state
if (t == 0) {
string_offset = 0;
page_string_data = db->find_end_of_block(s->data_start, s->data_end);
}
__syncthreads();

int const leaf_level_index = s->col.max_nesting_depth - 1;

// sanity check to make sure we can process this page
auto const batch_size = db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
return;
}

// if this is a bounds page, then we need to decode up to the first mini-block
// that has a value we need, and set string_offset to the position of the first value in the
// string data block.
auto const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);
if (is_bounds_pg && s->page.start_val > 0) {
if (t < warp_size) {
// string_off is only valid on thread 0
auto const string_off = db->skip_values_and_sum(s->page.start_val);
if (t == 0) {
string_offset = string_off;

// if there is no repetition, then we need to work through the whole page, so reset the
// delta decoder to the beginning of the page
if (not has_repetition) { db->init_binary_block(s->data_start, s->data_end); }
}
}
__syncthreads();
}

int string_pos = has_repetition ? s->page.start_val : 0;

while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
uint32_t target_pos;
uint32_t const src_pos = s->src_pos;

if (t < 2 * warp_size) { // warp0..1
target_pos = min(src_pos + 2 * batch_size, s->nz_count + batch_size);
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
// warp2 waits one cycle for warps 0/1 to produce a batch, and then stuffs string sizes
// into the proper location in the output. warp 3 does nothing until it's time to copy
// string data.
if (t < warp_size) {
// warp 0
// decode repetition and definition levels.
// - update validity vectors
// - updates offsets (for nested columns)
// - produces non-NULL value indices in s->nz_idx for subsequent decoding
gpuDecodeLevels<delta_rolling_buf_size, level_t>(s, sb, target_pos, rep, def, t);
} else if (t < 2 * warp_size) {
// warp 1
db->decode_batch();

} else if (t < 3 * warp_size && src_pos < target_pos) {
// warp 2
int const nproc = min(batch_size, s->page.end_val - string_pos);
string_pos += nproc;

// process the mini-block in batches of 32
for (uint32_t sp = src_pos + lane_id; sp < src_pos + batch_size; sp += 32) {
// the position in the output column/buffer
int dst_pos = sb->nz_idx[rolling_index<delta_rolling_buf_size>(sp)];

// handle skip_rows here. flat hierarchies can just skip up to first_row.
if (!has_repetition) { dst_pos -= s->first_row; }

// fill in offsets array
if (dst_pos >= 0 && sp < target_pos) {
auto const offptr =
reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out) + dst_pos;
*offptr = db->value_at(sp + skipped_leaf_values);
}
__syncwarp();
}

if (lane_id == 0) { s->src_pos = src_pos + batch_size; }
}
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
auto const src = page_string_data + string_offset;
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, t);

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace

/**
* @copydoc cudf::io::parquet::detail::DecodeDeltaBinary
*/
void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

Expand All @@ -611,13 +770,13 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
/**
* @copydoc cudf::io::parquet::gpu::DecodeDeltaByteArray
*/
void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
void DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

Expand All @@ -633,4 +792,29 @@ void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pa
}
}

/**
* @copydoc cudf::io::parquet::gpu::DecodeDeltaByteArray
*/
void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
vuule marked this conversation as resolved.
Show resolved Hide resolved
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

dim3 const dim_block(decode_block_size, 1);
dim3 const dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaLengthByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaLengthByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

} // namespace cudf::io::parquet::detail
Loading