Skip to content

Commit

Permalink
Limit DELTA_BINARY_PACKED encoder to the same number of bits as the p…
Browse files Browse the repository at this point in the history
…hysical type being encoded (rapidsai#14392)

The current implementation of the DELTA_BINARY_PACKED encoder can in certain circumstances use more bits for encoding than are present in the data. Specifically, for INT32 data, if the range of values is large enough, the encoder will use up to 33 bits. While not a violation of the Parquet specification, this does cause problems with certain parquet readers (see [this](apache/arrow#20374) arrow-cpp issue, for instance). libcudf and parquet-mr have no issue with reading data encoded with 33 bits, but in the interest of a) greater interoperability and b) smaller files, this PR changes the DELTA_BINARY_PACKED encoder to use no more bits than are present in the physical type being encoded (32 for INT32, 64 for INT64).

The actual change made here is to perform all the delta computations using 32-bit integers for INT32 data, and 64-bit integers for INT64 data. The prior encoder used 64-bit integers exclusively. To deal with possible overflow, all math is done on unsigned integers to get defined wrapping behavior (overflow with signed integers is UB), with results cast back to signed afterwards. This is in line with the approach taken by parquet-mr, arrow-cpp, and arrow-rs.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Bradley Dice (https://github.com/bdice)

URL: rapidsai#14392
  • Loading branch information
etseidl authored and karthikeyann committed Dec 12, 2023
1 parent 6fb609a commit 153b914
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 52 deletions.
49 changes: 25 additions & 24 deletions cpp/src/io/parquet/delta_enc.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ constexpr int buffer_size = 2 * block_size;
static_assert(block_size % 128 == 0);
static_assert(values_per_mini_block % 32 == 0);

using block_reduce = cub::BlockReduce<zigzag128_t, block_size>;
using warp_reduce = cub::WarpReduce<uleb128_t>;
using index_scan = cub::BlockScan<size_type, block_size>;

constexpr int rolling_idx(int index) { return rolling_index<buffer_size>(index); }

// Version of bit packer that can handle up to 64 bits values.
Expand Down Expand Up @@ -128,9 +124,15 @@ inline __device__ void bitpack_mini_block(
// Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input
// 128 values with validity at a time, saving them until there are enough values for a block
// to be written.
// T is the input data type (either zigzag128_t or uleb128_t).
// T is the input data type (either int32_t or int64_t).
template <typename T>
class delta_binary_packer {
public:
using U = std::make_unsigned_t<T>;
using block_reduce = cub::BlockReduce<T, delta::block_size>;
using warp_reduce = cub::WarpReduce<U>;
using index_scan = cub::BlockScan<size_type, delta::block_size>;

private:
uint8_t* _dst; // sink to dump encoded values to
T* _buffer; // buffer to store values to be encoded
Expand All @@ -140,9 +142,9 @@ class delta_binary_packer {
uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block

// pointers to shared scratch memory for the warp and block scans/reduces
delta::index_scan::TempStorage* _scan_tmp;
delta::warp_reduce::TempStorage* _warp_tmp;
delta::block_reduce::TempStorage* _block_tmp;
index_scan::TempStorage* _scan_tmp;
typename warp_reduce::TempStorage* _warp_tmp;
typename block_reduce::TempStorage* _block_tmp;

void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking

Expand All @@ -164,9 +166,9 @@ class delta_binary_packer {
}

// Signed subtraction with defined wrapping behavior.
inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b)
inline __device__ T subtract(T a, T b)
{
return static_cast<zigzag128_t>(static_cast<uleb128_t>(a) - static_cast<uleb128_t>(b));
return static_cast<T>(static_cast<U>(a) - static_cast<U>(b));
}

public:
Expand All @@ -178,9 +180,9 @@ class delta_binary_packer {
_dst = dest;
_num_values = num_values;
_buffer = buffer;
_scan_tmp = reinterpret_cast<delta::index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<delta::warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<delta::block_reduce::TempStorage*>(temp_storage);
_scan_tmp = reinterpret_cast<index_scan::TempStorage*>(temp_storage);
_warp_tmp = reinterpret_cast<typename warp_reduce::TempStorage*>(temp_storage);
_block_tmp = reinterpret_cast<typename block_reduce::TempStorage*>(temp_storage);
_bitpack_tmp = _buffer + delta::buffer_size;
_current_idx = 0;
_values_in_buffer = 0;
Expand All @@ -193,7 +195,7 @@ class delta_binary_packer {
size_type const valid = is_valid;
size_type pos;
size_type num_valid;
delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);
index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid);

if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; }
__syncthreads();
Expand All @@ -216,7 +218,7 @@ class delta_binary_packer {
inline __device__ uint8_t const* flush()
{
using cudf::detail::warp_size;
__shared__ zigzag128_t block_min;
__shared__ T block_min;

int const t = threadIdx.x;
int const warp_id = t / warp_size;
Expand All @@ -225,27 +227,26 @@ class delta_binary_packer {
if (_values_in_buffer <= 0) { return _dst; }

// Calculate delta for this thread.
size_type const idx = _current_idx + t;
zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<zigzag128_t>::max();
size_type const idx = _current_idx + t;
T const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)],
_buffer[delta::rolling_idx(idx - 1)])
: std::numeric_limits<T>::max();

// Find min delta for the block.
auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min());
auto const min_delta = block_reduce(*_block_tmp).Reduce(delta, cub::Min());

if (t == 0) { block_min = min_delta; }
__syncthreads();

// Compute frame of reference for the block.
uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;
U const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0;

// Get max normalized delta for each warp, and use that to determine how many bits to use
// for the bitpacking of this warp.
zigzag128_t const warp_max =
delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
U const warp_max = warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max());
__syncwarp();

if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); }
if (lane_id == 0) { _mb_bits[warp_id] = sizeof(long long) * 8 - __clzll(warp_max); }
__syncthreads();

// write block header
Expand Down
32 changes: 17 additions & 15 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,15 @@ struct BitwiseOr {
}
};

// I is the column type from the input table
template <typename I>
// PT is the parquet physical type (INT32 or INT64).
// I is the column type from the input table.
template <Type PT, typename I>
__device__ uint8_t const* delta_encode(page_enc_state_s<0>* s,
uint32_t valid_count,
uint64_t* buffer,
void* temp_space)
{
using output_type = std::conditional_t<std::is_signed_v<I>, zigzag128_t, uleb128_t>;
using output_type = std::conditional_t<PT == INT32, int32_t, int64_t>;
__shared__ delta_binary_packer<output_type> packer;

auto const t = threadIdx.x;
Expand Down Expand Up @@ -1711,9 +1712,10 @@ __global__ void __launch_bounds__(block_size, 8)
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
__shared__ union {
typename block_reduce::TempStorage reduce_storage;
typename delta::index_scan::TempStorage delta_index_tmp;
typename delta::block_reduce::TempStorage delta_reduce_tmp;
typename delta::warp_reduce::TempStorage delta_warp_red_tmp[delta::num_mini_blocks];
typename delta_binary_packer<uleb128_t>::index_scan::TempStorage delta_index_tmp;
typename delta_binary_packer<uleb128_t>::block_reduce::TempStorage delta_reduce_tmp;
typename delta_binary_packer<uleb128_t>::warp_reduce::TempStorage
delta_warp_red_tmp[delta::num_mini_blocks];
} temp_storage;

auto* const s = &state_g;
Expand Down Expand Up @@ -1784,40 +1786,40 @@ __global__ void __launch_bounds__(block_size, 8)
switch (dtype_len_in) {
case 8: {
// only DURATIONS map to 8 bytes, so safe to just use signed here?
delta_ptr = delta_encode<int64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int64_t>(s, valid_count, delta_shared, &temp_storage);
break;
}
case 4: {
if (type_id == type_id::UINT32) {
delta_ptr = delta_encode<uint32_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint32_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int32_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int32_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
case 2: {
if (type_id == type_id::UINT16) {
delta_ptr = delta_encode<uint16_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint16_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int16_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int16_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
case 1: {
if (type_id == type_id::UINT8) {
delta_ptr = delta_encode<uint8_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, uint8_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int8_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT32, int8_t>(s, valid_count, delta_shared, &temp_storage);
}
break;
}
default: CUDF_UNREACHABLE("invalid dtype_len_in when encoding DELTA_BINARY_PACKED");
}
} else {
if (type_id == type_id::UINT64) {
delta_ptr = delta_encode<uint64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT64, uint64_t>(s, valid_count, delta_shared, &temp_storage);
} else {
delta_ptr = delta_encode<int64_t>(s, valid_count, delta_shared, &temp_storage);
delta_ptr = delta_encode<INT64, int64_t>(s, valid_count, delta_shared, &temp_storage);
}
}

Expand Down
16 changes: 3 additions & 13 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ def delta_num_rows():
return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000]


@pytest.mark.parametrize("nrows", [1, 100000])
@pytest.mark.parametrize("nrows", delta_num_rows())
@pytest.mark.parametrize("add_nulls", [True, False])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -1346,18 +1346,8 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir):
use_dictionary=False,
)

# FIXME(ets): should probably not use more bits than the data type
try:
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
except OSError as e:
if dtype == "int32" and nrows == 100000:
pytest.mark.xfail(
reason="arrow does not support 33-bit delta encoding"
)
else:
raise e
else:
assert_eq(cdf2, cdf)
cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname))
assert_eq(cdf2, cdf)


@pytest.mark.parametrize("nrows", delta_num_rows())
Expand Down

0 comments on commit 153b914

Please sign in to comment.