From d97b3e091778987562508612d216a36207f5cd7c Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 5 Dec 2023 18:31:35 -0800 Subject: [PATCH] Limit DELTA_BINARY_PACKED encoder to the same number of bits as the physical type being encoded (#14392) The current implementation of the DELTA_BINARY_PACKED encoder can in certain circumstances use more bits for encoding than are present in the data. Specifically, for INT32 data, if the range of values is large enough, the encoder will use up to 33 bits. While not a violation of the Parquet specification, this does cause problems with certain parquet readers (see [this](https://github.com/apache/arrow/issues/20374) arrow-cpp issue, for instance). libcudf and parquet-mr have no issue with reading data encoded with 33 bits, but in the interest of a) greater interoperability and b) smaller files, this PR changes the DELTA_BINARY_PACKED encoder to use no more bits than are present in the physical type being encoded (32 for INT32, 64 for INT64). The actual change made here is to perform all the delta computations using 32-bit integers for INT32 data, and 64-bit integers for INT64 data. The prior encoder used 64-bit integers exclusively. To deal with possible overflow, all math is done on unsigned integers to get defined wrapping behavior (overflow with signed integers is UB), with results cast back to signed afterwards. This is in line with the approach taken by parquet-mr, arrow-cpp, and arrow-rs. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Yunsong Wang (https://github.com/PointKernel) - Mike Wilson (https://github.com/hyperbolic2346) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/14392 --- cpp/src/io/parquet/delta_enc.cuh | 49 +++++++++++++------------- cpp/src/io/parquet/page_enc.cu | 32 +++++++++-------- python/cudf/cudf/tests/test_parquet.py | 16 ++------- 3 files changed, 45 insertions(+), 52 deletions(-) diff --git a/cpp/src/io/parquet/delta_enc.cuh b/cpp/src/io/parquet/delta_enc.cuh index b0a7493fcab..cbb44d30a56 100644 --- a/cpp/src/io/parquet/delta_enc.cuh +++ b/cpp/src/io/parquet/delta_enc.cuh @@ -57,10 +57,6 @@ constexpr int buffer_size = 2 * block_size; static_assert(block_size % 128 == 0); static_assert(values_per_mini_block % 32 == 0); -using block_reduce = cub::BlockReduce; -using warp_reduce = cub::WarpReduce; -using index_scan = cub::BlockScan; - constexpr int rolling_idx(int index) { return rolling_index(index); } // Version of bit packer that can handle up to 64 bits values. @@ -128,9 +124,15 @@ inline __device__ void bitpack_mini_block( // Object used to turn a stream of integers into a DELTA_BINARY_PACKED stream. This takes as input // 128 values with validity at a time, saving them until there are enough values for a block // to be written. -// T is the input data type (either zigzag128_t or uleb128_t). +// T is the input data type (either int32_t or int64_t). template class delta_binary_packer { + public: + using U = std::make_unsigned_t; + using block_reduce = cub::BlockReduce; + using warp_reduce = cub::WarpReduce; + using index_scan = cub::BlockScan; + private: uint8_t* _dst; // sink to dump encoded values to T* _buffer; // buffer to store values to be encoded @@ -140,9 +142,9 @@ class delta_binary_packer { uint8_t _mb_bits[delta::num_mini_blocks]; // bitwidth for each mini-block // pointers to shared scratch memory for the warp and block scans/reduces - delta::index_scan::TempStorage* _scan_tmp; - delta::warp_reduce::TempStorage* _warp_tmp; - delta::block_reduce::TempStorage* _block_tmp; + index_scan::TempStorage* _scan_tmp; + typename warp_reduce::TempStorage* _warp_tmp; + typename block_reduce::TempStorage* _block_tmp; void* _bitpack_tmp; // pointer to shared scratch memory used in bitpacking @@ -164,9 +166,9 @@ class delta_binary_packer { } // Signed subtraction with defined wrapping behavior. - inline __device__ zigzag128_t subtract(zigzag128_t a, zigzag128_t b) + inline __device__ T subtract(T a, T b) { - return static_cast(static_cast(a) - static_cast(b)); + return static_cast(static_cast(a) - static_cast(b)); } public: @@ -178,9 +180,9 @@ class delta_binary_packer { _dst = dest; _num_values = num_values; _buffer = buffer; - _scan_tmp = reinterpret_cast(temp_storage); - _warp_tmp = reinterpret_cast(temp_storage); - _block_tmp = reinterpret_cast(temp_storage); + _scan_tmp = reinterpret_cast(temp_storage); + _warp_tmp = reinterpret_cast(temp_storage); + _block_tmp = reinterpret_cast(temp_storage); _bitpack_tmp = _buffer + delta::buffer_size; _current_idx = 0; _values_in_buffer = 0; @@ -193,7 +195,7 @@ class delta_binary_packer { size_type const valid = is_valid; size_type pos; size_type num_valid; - delta::index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid); + index_scan(*_scan_tmp).ExclusiveSum(valid, pos, num_valid); if (is_valid) { _buffer[delta::rolling_idx(pos + _current_idx + _values_in_buffer)] = value; } __syncthreads(); @@ -216,7 +218,7 @@ class delta_binary_packer { inline __device__ uint8_t const* flush() { using cudf::detail::warp_size; - __shared__ zigzag128_t block_min; + __shared__ T block_min; int const t = threadIdx.x; int const warp_id = t / warp_size; @@ -225,27 +227,26 @@ class delta_binary_packer { if (_values_in_buffer <= 0) { return _dst; } // Calculate delta for this thread. - size_type const idx = _current_idx + t; - zigzag128_t const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)], - _buffer[delta::rolling_idx(idx - 1)]) - : std::numeric_limits::max(); + size_type const idx = _current_idx + t; + T const delta = idx < _num_values ? subtract(_buffer[delta::rolling_idx(idx)], + _buffer[delta::rolling_idx(idx - 1)]) + : std::numeric_limits::max(); // Find min delta for the block. - auto const min_delta = delta::block_reduce(*_block_tmp).Reduce(delta, cub::Min()); + auto const min_delta = block_reduce(*_block_tmp).Reduce(delta, cub::Min()); if (t == 0) { block_min = min_delta; } __syncthreads(); // Compute frame of reference for the block. - uleb128_t const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0; + U const norm_delta = idx < _num_values ? subtract(delta, block_min) : 0; // Get max normalized delta for each warp, and use that to determine how many bits to use // for the bitpacking of this warp. - zigzag128_t const warp_max = - delta::warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max()); + U const warp_max = warp_reduce(_warp_tmp[warp_id]).Reduce(norm_delta, cub::Max()); __syncwarp(); - if (lane_id == 0) { _mb_bits[warp_id] = sizeof(zigzag128_t) * 8 - __clzll(warp_max); } + if (lane_id == 0) { _mb_bits[warp_id] = sizeof(long long) * 8 - __clzll(warp_max); } __syncthreads(); // write block header diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index d75608930d5..ba751548e3f 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -251,14 +251,15 @@ struct BitwiseOr { } }; -// I is the column type from the input table -template +// PT is the parquet physical type (INT32 or INT64). +// I is the column type from the input table. +template __device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint32_t valid_count, uint64_t* buffer, void* temp_space) { - using output_type = std::conditional_t, zigzag128_t, uleb128_t>; + using output_type = std::conditional_t; __shared__ delta_binary_packer packer; auto const t = threadIdx.x; @@ -1711,9 +1712,10 @@ __global__ void __launch_bounds__(block_size, 8) using block_reduce = cub::BlockReduce; __shared__ union { typename block_reduce::TempStorage reduce_storage; - typename delta::index_scan::TempStorage delta_index_tmp; - typename delta::block_reduce::TempStorage delta_reduce_tmp; - typename delta::warp_reduce::TempStorage delta_warp_red_tmp[delta::num_mini_blocks]; + typename delta_binary_packer::index_scan::TempStorage delta_index_tmp; + typename delta_binary_packer::block_reduce::TempStorage delta_reduce_tmp; + typename delta_binary_packer::warp_reduce::TempStorage + delta_warp_red_tmp[delta::num_mini_blocks]; } temp_storage; auto* const s = &state_g; @@ -1784,30 +1786,30 @@ __global__ void __launch_bounds__(block_size, 8) switch (dtype_len_in) { case 8: { // only DURATIONS map to 8 bytes, so safe to just use signed here? - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); break; } case 4: { if (type_id == type_id::UINT32) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } case 2: { if (type_id == type_id::UINT16) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } case 1: { if (type_id == type_id::UINT8) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } break; } @@ -1815,9 +1817,9 @@ __global__ void __launch_bounds__(block_size, 8) } } else { if (type_id == type_id::UINT64) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); } } diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b3a06dbd742..c5da03d2942 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1293,7 +1293,7 @@ def delta_num_rows(): return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000] -@pytest.mark.parametrize("nrows", [1, 100000]) +@pytest.mark.parametrize("nrows", delta_num_rows()) @pytest.mark.parametrize("add_nulls", [True, False]) @pytest.mark.parametrize( "dtype", @@ -1346,18 +1346,8 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir): use_dictionary=False, ) - # FIXME(ets): should probably not use more bits than the data type - try: - cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) - except OSError as e: - if dtype == "int32" and nrows == 100000: - pytest.mark.xfail( - reason="arrow does not support 33-bit delta encoding" - ) - else: - raise e - else: - assert_eq(cdf2, cdf) + cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) + assert_eq(cdf2, cdf) @pytest.mark.parametrize("nrows", delta_num_rows())