Skip to content

Commit

Permalink
Add DELTA_LENGTH_BYTE_ARRAY encoder and decoder for Parquet (#14590)
Browse files Browse the repository at this point in the history
Part of #13501. This adds the ability to read and write Parquet pages with DELTA_LENGTH_BYTE_ARRAY encoding.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Michael Wang (https://github.com/isVoid)
  - Nghia Truong (https://github.com/ttnghia)

URL: #14590
  • Loading branch information
etseidl authored Dec 20, 2023
1 parent 96a1b00 commit f1ff424
Show file tree
Hide file tree
Showing 13 changed files with 818 additions and 55 deletions.
45 changes: 44 additions & 1 deletion cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct delta_binary_decoder {
uint8_t const* cur_mb_start; // pointer to the start of the current mini-block data
uint8_t const* cur_bitwidths; // pointer to the bitwidth array in the block

uleb128_t value[delta_rolling_buf_size]; // circular buffer of delta values
zigzag128_t value[delta_rolling_buf_size]; // circular buffer of delta values

// returns the value stored in the `value` array at index
// `rolling_index<delta_rolling_buf_size>(idx)`. If `idx` is `0`, then return `first_value`.
Expand Down Expand Up @@ -299,6 +299,49 @@ struct delta_binary_decoder {
}
}

// Decodes and skips values until the block containing the value after `skip` is reached.
// Keeps a running sum of the values and returns that upon exit. Called by all threads in a
// warp 0. Result is only valid on thread 0.
// This is intended for use only by the DELTA_LENGTH_BYTE_ARRAY decoder.
inline __device__ size_t skip_values_and_sum(int skip)
{
using cudf::detail::warp_size;
// DELTA_LENGTH_BYTE_ARRAY lengths are encoded as INT32 by convention (since the PLAIN encoding
// uses 4-byte lengths).
using delta_length_type = int32_t;
using warp_reduce = cub::WarpReduce<size_t>;
__shared__ warp_reduce::TempStorage temp_storage;
int const t = threadIdx.x;

// initialize sum with first value, which is stored in the block header. cast to
// `delta_length_type` to ensure the value is interpreted properly before promoting it
// back to `size_t`.
size_t sum = static_cast<delta_length_type>(value_at(0));

// if only skipping one value, we're done already
if (skip == 1) { return sum; }

// need to do in multiple passes if values_per_mb != 32
uint32_t const num_pass = values_per_mb / warp_size;

while (current_value_idx < skip && current_value_idx < num_encoded_values(true)) {
calc_mini_block_values(t);

int const idx = current_value_idx + t;

for (uint32_t p = 0; p < num_pass; p++) {
auto const pidx = idx + p * warp_size;
size_t const val = pidx < skip ? static_cast<delta_length_type>(value_at(pidx)) : 0;
auto const warp_sum = warp_reduce(temp_storage).Sum(val);
if (t == 0) { sum += warp_sum; }
}
if (t == 0) { setup_next_mini_block(true); }
__syncwarp();
}

return sum;
}

// decodes the current mini block and stores the values obtained. should only be called by
// a single warp.
inline __device__ void decode_batch()
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ class delta_binary_packer {
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
_buffer[0] = 0;
}

// Each thread calls this to add its current value.
Expand Down Expand Up @@ -215,7 +216,7 @@ class delta_binary_packer {
}

// Called by each thread to flush data to the sink.
inline __device__ uint8_t const* flush()
inline __device__ uint8_t* flush()
{
using cudf::detail::warp_size;
__shared__ T block_min;
Expand All @@ -224,6 +225,10 @@ class delta_binary_packer {
int const warp_id = t / warp_size;
int const lane_id = t % warp_size;

// if no values have been written, still need to write the header
if (t == 0 && _current_idx == 0) { write_header(); }

// if there are no values to write, just return
if (_values_in_buffer <= 0) { return _dst; }

// Calculate delta for this thread.
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_run = 0;
} break;
case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
// nothing to do, just don't error
break;
Expand Down
214 changes: 199 additions & 15 deletions cpp/src/io/parquet/page_delta_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "page_string_utils.cuh"
#include "parquet_gpu.hpp"

#include <io/utilities/block_utils.cuh>

#include <cudf/detail/utilities/cuda.cuh>

#include <rmm/exec_policy.hpp>
Expand Down Expand Up @@ -463,7 +465,7 @@ __global__ void __launch_bounds__(decode_block_size)
bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// choose a character parallel string copy when the average string is longer than a warp
auto const use_char_ll = (s->page.str_bytes / s->page.num_valids) > cudf::detail::warp_size;
auto const use_char_ll = (s->page.str_bytes / s->page.num_valids) > warp_size;

// copying logic from gpuDecodePageData.
PageNestingDecodeInfo const* nesting_info_base = s->nesting_info;
Expand Down Expand Up @@ -493,6 +495,7 @@ __global__ void __launch_bounds__(decode_block_size)
int const leaf_level_index = s->col.max_nesting_depth - 1;
auto strings_data = nesting_info_base[leaf_level_index].string_out;

// sanity check to make sure we can process this page
auto const batch_size = prefix_db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<kernel_error::value_type>(decode_error::DELTA_PARAMS_UNSUPPORTED),
Expand Down Expand Up @@ -581,18 +584,174 @@ __global__ void __launch_bounds__(decode_block_size)
if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

// Decode page data that is DELTA_LENGTH_BYTE_ARRAY packed. This encoding consists of a
// DELTA_BINARY_PACKED array of string lengths, followed by the string data.
template <typename level_t>
__global__ void __launch_bounds__(decode_block_size)
gpuDecodeDeltaLengthByteArray(PageInfo* pages,
device_span<ColumnChunkDesc const> chunks,
size_t min_row,
size_t num_rows,
kernel_error::pointer error_code)
{
using cudf::detail::warp_size;
__shared__ __align__(16) delta_binary_decoder db_state;
__shared__ __align__(16) page_state_s state_g;
__shared__ __align__(16) page_state_buffers_s<delta_rolling_buf_size, 0, 0> state_buffers;
__shared__ __align__(8) uint8_t const* page_string_data;
__shared__ size_t string_offset;

page_state_s* const s = &state_g;
auto* const sb = &state_buffers;
int const page_idx = blockIdx.x;
int const t = threadIdx.x;
int const lane_id = t % warp_size;
auto* const db = &db_state;
[[maybe_unused]] null_count_back_copier _{s, t};

auto const mask = decode_kernel_mask::DELTA_LENGTH_BA;
if (!setupLocalPageInfo(s,
&pages[page_idx],
chunks,
min_row,
num_rows,
mask_filter{mask},
page_processing_stage::DECODE)) {
return;
}

bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;

// copying logic from gpuDecodePageData.
PageNestingDecodeInfo const* nesting_info_base = s->nesting_info;

__shared__ level_t rep[delta_rolling_buf_size]; // circular buffer of repetition level values
__shared__ level_t def[delta_rolling_buf_size]; // circular buffer of definition level values

// skipped_leaf_values will always be 0 for flat hierarchies.
uint32_t const skipped_leaf_values = s->page.skipped_leaf_values;

// initialize delta state
if (t == 0) {
string_offset = 0;
page_string_data = db->find_end_of_block(s->data_start, s->data_end);
}
__syncthreads();

int const leaf_level_index = s->col.max_nesting_depth - 1;

// sanity check to make sure we can process this page
auto const batch_size = db->values_per_mb;
if (batch_size > max_delta_mini_block_size) {
set_error(static_cast<int32_t>(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code);
return;
}

// if this is a bounds page, then we need to decode up to the first mini-block
// that has a value we need, and set string_offset to the position of the first value in the
// string data block.
auto const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);
if (is_bounds_pg && s->page.start_val > 0) {
if (t < warp_size) {
// string_off is only valid on thread 0
auto const string_off = db->skip_values_and_sum(s->page.start_val);
if (t == 0) {
string_offset = string_off;

// if there is no repetition, then we need to work through the whole page, so reset the
// delta decoder to the beginning of the page
if (not has_repetition) { db->init_binary_block(s->data_start, s->data_end); }
}
}
__syncthreads();
}

int string_pos = has_repetition ? s->page.start_val : 0;

while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
uint32_t target_pos;
uint32_t const src_pos = s->src_pos;

if (t < 2 * warp_size) { // warp0..1
target_pos = min(src_pos + 2 * batch_size, s->nz_count + batch_size);
} else { // warp2
target_pos = min(s->nz_count, src_pos + batch_size);
}
// this needs to be here to prevent warp 2 modifying src_pos before all threads have read it
__syncthreads();

// warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas.
// warp2 waits one cycle for warps 0/1 to produce a batch, and then stuffs string sizes
// into the proper location in the output. warp 3 does nothing until it's time to copy
// string data.
if (t < warp_size) {
// warp 0
// decode repetition and definition levels.
// - update validity vectors
// - updates offsets (for nested columns)
// - produces non-NULL value indices in s->nz_idx for subsequent decoding
gpuDecodeLevels<delta_rolling_buf_size, level_t>(s, sb, target_pos, rep, def, t);
} else if (t < 2 * warp_size) {
// warp 1
db->decode_batch();

} else if (t < 3 * warp_size && src_pos < target_pos) {
// warp 2
int const nproc = min(batch_size, s->page.end_val - string_pos);
string_pos += nproc;

// process the mini-block in batches of 32
for (uint32_t sp = src_pos + lane_id; sp < src_pos + batch_size; sp += 32) {
// the position in the output column/buffer
int dst_pos = sb->nz_idx[rolling_index<delta_rolling_buf_size>(sp)];

// handle skip_rows here. flat hierarchies can just skip up to first_row.
if (!has_repetition) { dst_pos -= s->first_row; }

// fill in offsets array
if (dst_pos >= 0 && sp < target_pos) {
auto const offptr =
reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out) + dst_pos;
*offptr = db->value_at(sp + skipped_leaf_values);
}
__syncwarp();
}

if (lane_id == 0) { s->src_pos = src_pos + batch_size; }
}
__syncthreads();
}

// now turn array of lengths into offsets
int value_count = nesting_info_base[leaf_level_index].value_count;

// if no repetition we haven't calculated start/end bounds and instead just skipped
// values until we reach first_row. account for that here.
if (!has_repetition) { value_count -= s->first_row; }

auto const offptr = reinterpret_cast<size_type*>(nesting_info_base[leaf_level_index].data_out);
block_excl_sum<decode_block_size>(offptr, value_count, s->page.str_offset);

// finally, copy the string data into place
auto const dst = nesting_info_base[leaf_level_index].string_out;
auto const src = page_string_data + string_offset;
memcpy_block<decode_block_size, true>(dst, src, s->page.str_bytes, t);

if (t == 0 and s->error != 0) { set_error(s->error, error_code); }
}

} // anonymous namespace

/**
* @copydoc cudf::io::parquet::detail::DecodeDeltaBinary
*/
void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
void DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

Expand All @@ -611,13 +770,13 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector<PageInfo>& pages
/**
* @copydoc cudf::io::parquet::gpu::DecodeDeltaByteArray
*/
void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
void DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

Expand All @@ -633,4 +792,29 @@ void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector<PageInfo>& pa
}
}

/**
* @copydoc cudf::io::parquet::gpu::DecodeDeltaByteArray
*/
void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_vector<PageInfo>& pages,
cudf::detail::hostdevice_vector<ColumnChunkDesc> const& chunks,
size_t num_rows,
size_t min_row,
int level_type_size,
kernel_error::pointer error_code,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(pages.size() > 0, "There is no page to decode");

dim3 const dim_block(decode_block_size, 1);
dim3 const dim_grid(pages.size(), 1); // 1 threadblock per page

if (level_type_size == 1) {
gpuDecodeDeltaLengthByteArray<uint8_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
} else {
gpuDecodeDeltaLengthByteArray<uint16_t><<<dim_grid, dim_block, 0, stream.value()>>>(
pages.device_ptr(), chunks, min_row, num_rows, error_code);
}
}

} // namespace cudf::io::parquet::detail
Loading

0 comments on commit f1ff424

Please sign in to comment.