Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DELTA_BINARY_PACKED encoder for Parquet writer #14100

Merged
merged 62 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
91f7943
add DELTA_BINARY_PACKED encoder
etseidl Sep 13, 2023
478f8ac
clean up some consts and change kernel mask to enum class
etseidl Sep 13, 2023
cd4df51
more cleanup
etseidl Sep 13, 2023
2e33420
remove some FIXMEs
etseidl Sep 13, 2023
0bd64c8
change FIXME to TODO for TS scaling
etseidl Sep 13, 2023
fffc659
use switch rather than if/else block
etseidl Sep 13, 2023
a98b338
get rid of some magic numbers
etseidl Sep 13, 2023
9d1b2f2
replace operator overload with templated function
etseidl Sep 13, 2023
dc14171
Merge branch 'branch-23.10' into encode_delta_binary
etseidl Sep 13, 2023
39b498c
Merge branch 'rapidsai:branch-23.10' into encode_delta_binary
etseidl Sep 14, 2023
ea6f3c0
redo typing of the delta bit packer
etseidl Sep 15, 2023
2d41d3a
add const
etseidl Sep 15, 2023
1d94621
add test for delta binary writer
etseidl Sep 15, 2023
4e2c4a5
remove unsupported types from delta test
etseidl Sep 15, 2023
b3b25de
make second column unordered
etseidl Sep 15, 2023
ce5378f
Merge branch 'rapidsai:branch-23.10' into encode_delta_binary
etseidl Sep 15, 2023
00b248f
remove leftover experiment
etseidl Sep 18, 2023
1c75898
Merge branch 'encode_delta_binary' of github.com:etseidl/cudf into en…
etseidl Sep 18, 2023
dd526fa
Merge branch 'branch-23.10' into encode_delta_binary
PointKernel Sep 18, 2023
4c54ff6
Merge branch 'branch-23.10' into encode_delta_binary
etseidl Sep 20, 2023
8b95a78
change put_zz128 to void
etseidl Sep 21, 2023
24ca047
Merge branch 'branch-23.10' into encode_delta_binary
etseidl Sep 21, 2023
727762d
Merge branch 'rapidsai:branch-23.10' into encode_delta_binary
etseidl Sep 27, 2023
9d2aa84
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Sep 27, 2023
71b8eca
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Sep 27, 2023
5647bb2
Merge branch 'rapidsai:branch-23.10' into encode_delta_binary
etseidl Sep 28, 2023
f1b8314
Merge remote-tracking branch 'origin/branch-23.12' into encode_delta_…
etseidl Sep 28, 2023
4ad772c
Merge branch 'rapidsai:branch-23.10' into encode_delta_binary
etseidl Sep 28, 2023
1c22317
Merge branch 'branch-23.12' into encode_delta_binary
vuule Sep 28, 2023
0150236
fix typo
etseidl Sep 29, 2023
6477543
another typo
etseidl Sep 29, 2023
60df88e
implement suggestions from review
etseidl Sep 29, 2023
ac29dac
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Sep 29, 2023
e6cc71e
use template foo to get a single BitAnd
etseidl Sep 29, 2023
7fff8f0
remove TODO
etseidl Sep 29, 2023
4eee6a0
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Sep 29, 2023
8a20836
add an ifdef around is_scoped_enum
etseidl Sep 29, 2023
cf1289b
avoid UB when calculating deltas
etseidl Sep 29, 2023
068a017
remove unnecessary sync
etseidl Sep 30, 2023
6be661e
lost some constants somewhere
etseidl Sep 30, 2023
bd19be0
address some review comments
etseidl Oct 2, 2023
84f02a7
reduce register pressure a bit
etseidl Oct 2, 2023
806bca2
add some comments and sanity checks
etseidl Oct 2, 2023
ffe2d90
add test with sliced table
etseidl Oct 2, 2023
4f47bdd
make comment match reality
etseidl Oct 2, 2023
c60a4a9
remove some template declarations
etseidl Oct 2, 2023
9049b7d
implement suggestion from review
etseidl Oct 2, 2023
a29b70b
Merge branch 'encode_delta_binary' of github.com:etseidl/cudf into en…
etseidl Oct 2, 2023
2a9d5d9
remove another template param
etseidl Oct 2, 2023
8604b53
replace encoder struct with function
etseidl Oct 2, 2023
307017a
add sliced list test
etseidl Oct 2, 2023
dbbc58d
Merge remote-tracking branch 'origin/branch-23.12' into encode_delta_…
etseidl Oct 6, 2023
24ca06d
Merge remote-tracking branch 'origin/branch-23.12' into encode_delta_…
etseidl Oct 9, 2023
3db131b
finish merge
etseidl Oct 9, 2023
057dc00
Merge branch 'rapidsai:branch-23.12' into encode_delta_binary
etseidl Oct 10, 2023
d766012
Merge remote-tracking branch 'origin/branch-23.12' into encode_delta_…
etseidl Oct 10, 2023
44ec4ff
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Oct 11, 2023
28ed028
Merge branch 'rapidsai:branch-23.12' into encode_delta_binary
etseidl Oct 16, 2023
0bc8169
Merge remote-tracking branch 'origin/branch-23.12' into encode_delta_…
etseidl Oct 18, 2023
5db6312
clean ups suggested in review
etseidl Oct 18, 2023
c1445ed
get rid of TODO
etseidl Oct 19, 2023
fab105a
Merge branch 'branch-23.12' into encode_delta_binary
etseidl Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cpp/src/io/parquet/delta_binary.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ namespace cudf::io::parquet::detail {
// encoded with DELTA_LENGTH_BYTE_ARRAY encoding, which is a DELTA_BINARY_PACKED list of suffix
// lengths, followed by the concatenated suffix data.

// TODO: The delta encodings use ULEB128 integers, but for now we're only
// using max 64 bits. Need to see what the performance impact is of using
// __int128_t rather than int64_t.
using uleb128_t = uint64_t;
using zigzag128_t = int64_t;

// we decode one mini-block at a time. max mini-block size seen is 64.
constexpr int delta_rolling_buf_size = 128;

Expand Down
290 changes: 290 additions & 0 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "parquet_gpu.hpp"

#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/integer_utils.hpp>

#include <cub/cub.cuh>

namespace cudf::io::parquet::detail {

namespace delta {
vuule marked this conversation as resolved.
Show resolved Hide resolved

inline __device__ void put_uleb128(uint8_t*& p, uleb128_t v)
{
while (v > 0x7f) {
*p++ = v | 0x80;
vuule marked this conversation as resolved.
Show resolved Hide resolved
v >>= 7;
}
*p++ = v;
}

inline __device__ void put_zz128(uint8_t*& p, zigzag128_t v)
{
zigzag128_t s = (v < 0);
put_uleb128(p, (v ^ -s) * 2 + s);
}

// a block size of 128, with 4 mini-blocks of 32 values each fits nicely without consuming
// too much shared memory.
// the parquet spec requires block_size to be a multiple of 128, and values_per_mini_block
// to be a multiple of 32.
vuule marked this conversation as resolved.
Show resolved Hide resolved
constexpr int block_size = 128;
constexpr int num_mini_blocks = 4;
constexpr int values_per_mini_block = block_size / num_mini_blocks;
vuule marked this conversation as resolved.
Show resolved Hide resolved
constexpr int buffer_size = 2 * block_size;

// extra sanity checks to enforce compliance with the parquet specification
Copy link
Contributor

@ttnghia ttnghia Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It is cleaner to have any sentence (including comments) begins with a capitalized letter and ends with a period. Sorry if I'm nitpicking too much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries...hitting caps is an extra keystroke and I'm lazy 😅

static_assert(block_size % 128 == 0);
static_assert(values_per_mini_block % 32 == 0);

using block_reduce = cub::BlockReduce<zigzag128_t, block_size>;
using warp_reduce = cub::WarpReduce<uleb128_t>;
using index_scan = cub::BlockScan<size_type, block_size>;

constexpr int rolling_idx(int index) { return rolling_index<buffer_size>(index); }

// version of bit packer that can handle up to 64 bits values.
// T is the type to use for processing. if nbits <= 32 use uint32_t, otherwise unsigned long long
// (not uint64_t because of atomicOr's typing). allowing this to be selectable since there's a
// measurable impact to using the wider types.
template <typename scratch_type>
inline __device__ void bitpack_mini_block(
uint8_t* dst, uleb128_t val, uint32_t count, uint8_t nbits, void* temp_space)
{
using wide_type =
std::conditional_t<std::is_same_v<scratch_type, unsigned long long>, __uint128_t, uint64_t>;
using cudf::detail::warp_size;
scratch_type constexpr mask = sizeof(scratch_type) * 8 - 1;
auto constexpr div = sizeof(scratch_type) * 8;

auto const lane_id = threadIdx.x % warp_size;
auto const warp_id = threadIdx.x / warp_size;

auto const scratch = reinterpret_cast<scratch_type*>(temp_space) + warp_id * warp_size;

// zero out scratch
scratch[lane_id] = 0;
__syncwarp();

// TODO: see if there is any savings using special packing for easy bitwidths (1,2,4,8,16...)
// like what's done for the RLE encoder.
if (nbits == div) {
if (lane_id < count) {
for (int i = 0; i < sizeof(scratch_type); i++) {
vuule marked this conversation as resolved.
Show resolved Hide resolved
dst[lane_id * sizeof(scratch_type) + i] = val & 0xff;
val >>= 8;
}
}
return;
}

if (lane_id <= count) {
// shift symbol left by up to mask bits
wide_type v2 = val;
v2 <<= (lane_id * nbits) & mask;

// Copy N bit word into two N/2 bit words while following C++ strict aliasing rules.
scratch_type v1[2];
memcpy(&v1, &v2, sizeof(wide_type));

// Atomically write result to scratch
if (v1[0]) { atomicOr(scratch + ((lane_id * nbits) / div), v1[0]); }
if (v1[1]) { atomicOr(scratch + ((lane_id * nbits) / div) + 1, v1[1]); }
}
__syncwarp();

// Copy scratch data to final destination
auto const available_bytes = util::div_rounding_up_safe(count * nbits, 8U);
auto const scratch_bytes = reinterpret_cast<uint8_t const*>(scratch);

for (uint32_t i = lane_id; i < available_bytes; i += warp_size) {
dst[i] = scratch_bytes[i];
}
__syncwarp();
}

} // namespace delta

// Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input
// 128 values with validity at a time, saving them until there are enough values for a block
// to be written.
// T is the input data type (either zigzag128_t or uleb128_t)
template <typename T>
class delta_binary_packer {
private:
uint8_t* _dst; // sink to dump encoded values to
size_type _current_idx; // index of first value in buffer
uint32_t _num_values; // total number of values to encode
size_type _values_in_buffer; // current number of values stored in _buffer
T* _buffer; // buffer to store values to be encoded
uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you group pointers and values please?


// pointers to shared scratch memory for the warp and block scans/reduces
delta::index_scan::TempStorage* _scan_tmp;
delta::warp_reduce::TempStorage* _warp_tmp;
delta::block_reduce::TempStorage* _block_tmp;

void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking

// write the delta binary header. only call from thread 0
inline __device__ void write_header()
{
delta::put_uleb128(_dst, delta::block_size);
delta::put_uleb128(_dst, delta::num_mini_blocks);
delta::put_uleb128(_dst, _num_values);
delta::put_zz128(_dst, _buffer[0]);
}

// write the block header. only call from thread 0
inline __device__ void write_block_header(zigzag128_t block_min)
{
delta::put_zz128(_dst, block_min);
memcpy(_dst, _mb_bits, 4);
_dst += 4;
}

// signed subtraction with defined wrapping behavior
inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b)
{
return static_cast<zigzag128_t>(static_cast<uleb128_t>(a) - static_cast<uleb128_t>(b));
}

public:
inline __device__ auto num_values() const { return _num_values; }

// initialize the object. only call from thread 0
inline __device__ void init(uint8_t* dest, uint32_t num_values, T* buffer, void* temp_storage)
{
_dst = dest;
_num_values = num_values;
_buffer = buffer;
_scan_tmp = reinterpret_cast<delta::index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<delta::warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<delta::block_reduce::TempStorage*>(temp_storage);
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
}

// each thread calls this to add its current value
inline __device__ void add_value(T value, bool is_valid)
{
// figure out the correct position for the given value
size_type const valid = is_valid;
size_type pos;
size_type num_valid;
delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);

if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; }
__syncthreads();

if (threadIdx.x == 0) {
_values_in_buffer += num_valid;
// if first pass write header
if (_current_idx == 0) {
write_header();
_current_idx = 1;
_values_in_buffer -= 1;
}
}
__syncthreads();

if (_values_in_buffer >= delta::block_size) { flush(); }
}

// called by each thread to flush data to the sink.
inline __device__ uint8_t const* flush()
{
using cudf::detail::warp_size;
__shared__ zigzag128_t block_min;

int const t = threadIdx.x;
int const warp_id = t / warp_size;
int const lane_id = t % warp_size;

if (_values_in_buffer <= 0) { return _dst; }

// calculate delta for this thread
size_type const idx = _current_idx + t;
zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<zigzag128_t>::max();

// find min delta for the block
auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min());

if (t == 0) { block_min = min_delta; }
__syncthreads();

// compute frame of reference for the block
uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;

// get max normalized delta for each warp, and use that to determine how many bits to use
// for the bitpacking of this warp
zigzag128_t const warp_max =
delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
__syncwarp();

if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); }
__syncthreads();

// write block header
if (t == 0) { write_block_header(block_min); }
__syncthreads();

// now each warp encodes its data...can calculate starting offset with _mb_bits
// NOTE: using a switch here rather than a loop because the compiler produces code that
// uses fewer registers.
int cumulative_bits = 0;
switch (warp_id) {
case 3: cumulative_bits += _mb_bits[2]; [[fallthrough]];
case 2: cumulative_bits += _mb_bits[1]; [[fallthrough]];
case 1: cumulative_bits += _mb_bits[0];
}
uint8_t* const mb_ptr = _dst + cumulative_bits * delta::values_per_mini_block / 8;

// encoding happens here
auto const warp_idx = _current_idx + warp_id * delta::values_per_mini_block;
if (warp_idx < _num_values) {
auto const num_enc = min(delta::values_per_mini_block, _num_values - warp_idx);
if (_mb_bits[warp_id] > 32) {
delta::bitpack_mini_block<unsigned long long>(
mb_ptr, norm_delta, num_enc, _mb_bits[warp_id], _bitpack_tmp);
} else {
delta::bitpack_mini_block<uint32_t>(
mb_ptr, norm_delta, num_enc, _mb_bits[warp_id], _bitpack_tmp);
}
}
__syncthreads();

// last warp updates global delta ptr
if (warp_id == delta::num_mini_blocks - 1 && lane_id == 0) {
_dst = mb_ptr + _mb_bits[warp_id] * delta::values_per_mini_block / 8;
_current_idx = min(warp_idx + delta::values_per_mini_block, _num_values);
_values_in_buffer = max(_values_in_buffer - delta::block_size, 0U);
}
__syncthreads();

return _dst;
}
};

} // namespace cudf::io::parquet::detail
Loading