Skip to content

Commit

Permalink
Compression: support FOR & replace Delta with DeltaFOR (#8983)
Browse files Browse the repository at this point in the history
ref #8982
  • Loading branch information
Lloyd-Pottiger authored May 8, 2024
1 parent db0bfc9 commit 2251037
Show file tree
Hide file tree
Showing 14 changed files with 566 additions and 208 deletions.
2 changes: 2 additions & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ target_include_directories(cpptoml INTERFACE
SET (BENCHMARK_ENABLE_TESTING OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
SET (BENCHMARK_ENABLE_GTEST_TESTS OFF CACHE BOOL "Disable google-benchmark testing" FORCE)
add_subdirectory(benchmark)
target_compile_options(benchmark PRIVATE "-Wno-error=thread-safety-analysis")
target_no_warning(benchmark thread-safety-analysis)

set (BUILD_TESTING OFF CACHE BOOL "Disable cpu-features testing" FORCE)
if (NOT (OS_DARWIN AND ARCH_AARCH64))
Expand Down
3 changes: 2 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ check_then_add_sources_compile_flag (
src/Columns/ColumnVector.cpp
src/DataTypes/DataTypeString.cpp
src/Interpreters/Join.cpp
src/IO/Compression/CompressionCodecDelta.cpp
src/IO/Compression/CompressionCodecFOR.cpp
src/IO/Compression/CompressionCodecDeltaFOR.cpp
src/Storages/DeltaMerge/BitmapFilter/BitmapFilter.cpp
src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.cpp
)
Expand Down
10 changes: 7 additions & 3 deletions dbms/src/Common/BitpackingPrimitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class BitpackingPrimitives
UInt8 width,
bool skip_sign_extension = false)
{
if (width == 0)
{
memset(dst, 0, count * sizeof(T));
return;
}
for (size_t i = 0; i < count; i += BITPACKING_ALGORITHM_GROUP_SIZE)
{
unPackGroup<T>(dst + i * sizeof(T), src + (i * width) / 8, width, skip_sign_extension);
Expand Down Expand Up @@ -125,13 +130,12 @@ class BitpackingPrimitives
}

// round up to nearest multiple of BITPACKING_ALGORITHM_GROUP_SIZE
template <typename T>
constexpr static T roundUpToAlgorithmGroupSize(T num_to_round)
constexpr static size_t roundUpToAlgorithmGroupSize(size_t num_to_round)
{
static_assert(
(BITPACKING_ALGORITHM_GROUP_SIZE & (BITPACKING_ALGORITHM_GROUP_SIZE - 1)) == 0,
"BITPACKING_ALGORITHM_GROUP_SIZE must be a power of 2");
constexpr T mask = BITPACKING_ALGORITHM_GROUP_SIZE - 1;
constexpr size_t mask = BITPACKING_ALGORITHM_GROUP_SIZE - 1;
return (num_to_round + mask) & ~mask;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/BitpackingPrimitives.h>
#include <Common/Exception.h>
#include <DataTypes/IDataType.h>
#include <IO/Compression/CompressionCodecDelta.h>
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <common/likely.h>
#include <common/unaligned.h>


#if defined(__AVX2__)
#include <immintrin.h>
#endif
Expand All @@ -32,80 +34,76 @@ extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecDelta::CompressionCodecDelta(UInt8 delta_bytes_size_)
: delta_bytes_size(delta_bytes_size_)
CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_)
: bytes_size(bytes_size_)
{}

UInt8 CompressionCodecDelta::getMethodByte() const
UInt8 CompressionCodecDeltaFOR::getMethodByte() const
{
return static_cast<UInt8>(CompressionMethodByte::Delta);
return static_cast<UInt8>(CompressionMethodByte::DeltaFOR);
}

namespace
UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
template <typename T>
void compressData(const char * source, UInt32 source_size, char * dest)
/**
*|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
*|1 bytes |bytes_size |sizeof(UInt8)|required size |
*/
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}

namespace
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_COMPRESS,
"Cannot compress with Delta codec, data size {} is not aligned to {}",
source_size,
sizeof(T));

T prev_src = 0;
const char * const source_end = source + source_size;
while (source < source_end)
template <std::integral T>
void DeltaEncode(const T * source, UInt32 count, T * dest)
{
T prev = 0;
for (UInt32 i = 0; i < count; ++i)
{
T curr_src = unalignedLoad<T>(source);
unalignedStore<T>(dest, curr_src - prev_src);
prev_src = curr_src;

source += sizeof(T);
dest += sizeof(T);
T curr = source[i];
dest[i] = curr - prev;
prev = curr;
}
}

template <typename T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
template <std::integral T>
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
if (source_size % sizeof(T) != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress Delta-encoded data, data size {} is not aligned to {}",
source_size,
sizeof(T));
const auto count = source_size / sizeof(T);
DeltaEncode<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
// Cast deltas to signed type to better compress negative values.
using TS = typename std::make_signed<T>::type;
return CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
}

const char * const output_end = dest + output_size;
template <std::integral T>
void ordinaryDeltaDecode(const char * source, UInt32 source_size, char * dest)
{
T accumulator{};
const char * const source_end = source + source_size;
while (source < source_end)
{
accumulator += unalignedLoad<T>(source);
if unlikely (dest + sizeof(accumulator) > output_end)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data");
unalignedStore<T>(dest, accumulator);

source += sizeof(T);
dest += sizeof(T);
}
}

template <typename T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
template <std::integral T>
void DeltaDecode(const char * source, UInt32 source_size, char * dest)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(source, source_size, dest);
}

#if defined(__AVX2__)
// Note: using SIMD to rewrite compress does not improve performance.

template <>
void decompressData<UInt32>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt32>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt32 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt32);
Expand All @@ -129,11 +127,7 @@ void decompressData<UInt32>(
}

template <>
void decompressData<UInt64>(
const char * __restrict__ raw_source,
UInt32 raw_source_size,
char * __restrict__ raw_dest,
UInt32 /*raw_output_size*/)
void DeltaDecode<UInt64>(const char * __restrict__ raw_source, UInt32 raw_source_size, char * __restrict__ raw_dest)
{
const auto * source = reinterpret_cast<const UInt64 *>(raw_source);
auto source_size = raw_source_size / sizeof(UInt64);
Expand Down Expand Up @@ -168,46 +162,77 @@ void decompressData<UInt64>(

#endif

template <std::integral T>
void ordinaryDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
using TS = typename std::make_signed<T>::type;
CompressionCodecFOR::decompressData<TS>(source, source_size, dest, output_size);
ordinaryDeltaDecode<T>(dest, output_size, dest);
}

template <std::integral T>
void decompressData(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
ordinaryDecompressData<T>(source, source_size, dest, output_size);
}

template <>
void decompressData<UInt32>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt32);
auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt32);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int32>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt32>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

template <>
void decompressData<UInt64>(const char * source, UInt32 source_size, char * dest, UInt32 output_size)
{
const auto count = output_size / sizeof(UInt64);
const auto round_size = BitpackingPrimitives::roundUpToAlgorithmGroupSize(count);
// Reserve enough space for the temporary buffer.
const auto required_size = round_size * sizeof(UInt64);
char tmp_buffer[required_size];
CompressionCodecFOR::decompressData<Int64>(source, source_size, tmp_buffer, required_size);
DeltaDecode<UInt64>(reinterpret_cast<const char *>(tmp_buffer), output_size, dest);
}

} // namespace

UInt32 CompressionCodecDelta::doCompressData(const char * source, UInt32 source_size, char * dest) const
UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
if unlikely (source_size % delta_bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"source size {} is not aligned to {}",
source_size,
delta_bytes_size);
dest[0] = delta_bytes_size;
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
dest[0] = bytes_size;
size_t start_pos = 1;
switch (delta_bytes_size)
switch (bytes_size)
{
case 1:
compressData<UInt8>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt8>(source, source_size, &dest[start_pos]);
case 2:
compressData<UInt16>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt16>(source, source_size, &dest[start_pos]);
case 4:
compressData<UInt32>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt32>(source, source_size, &dest[start_pos]);
case 8:
compressData<UInt64>(source, source_size, &dest[start_pos]);
break;
return 1 + compressData<UInt64>(source, source_size, &dest[start_pos]);
default:
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress Delta-encoded data. Unsupported bytes size");
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size");
}
return 1 + source_size;
}

void CompressionCodecDelta::doDecompressData(
void CompressionCodecDeltaFOR::doDecompressData(
const char * source,
UInt32 source_size,
char * dest,
UInt32 uncompressed_size) const
{
if unlikely (source_size < 2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (uncompressed_size == 0)
return;
Expand All @@ -219,32 +244,39 @@ void CompressionCodecDelta::doDecompressData(
"uncompressed size {} is not aligned to {}",
uncompressed_size,
bytes_size);
UInt32 output_size = uncompressed_size;

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
{
case 1:
decompressData<UInt8>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
decompressData<UInt16>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
decompressData<UInt32>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
decompressData<UInt64>(&source[1], source_size_no_header, dest, output_size);
decompressData<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
}
}

void CompressionCodecDelta::ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size)
void CompressionCodecDeltaFOR::ordinaryDecompress(
const char * source,
UInt32 source_size,
char * dest,
UInt32 dest_size)
{
if unlikely (source_size < 2)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress delta-encoded data. File has wrong header");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (dest_size == 0)
return;
Expand Down Expand Up @@ -273,7 +305,9 @@ void CompressionCodecDelta::ordinaryDecompress(const char * source, UInt32 sourc
ordinaryDecompressData<UInt64>(&source[1], source_size_no_header, dest, dest_size);
break;
default:
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "Cannot decompress Delta-encoded data. Unsupported bytes size");
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,10 @@
namespace DB
{

class CompressionCodecDelta : public ICompressionCodec
class CompressionCodecDeltaFOR : public ICompressionCodec
{
public:
explicit CompressionCodecDelta(UInt8 delta_bytes_size_);
explicit CompressionCodecDeltaFOR(UInt8 bytes_size_);

UInt8 getMethodByte() const override;

Expand All @@ -36,13 +36,13 @@ class CompressionCodecDelta : public ICompressionCodec
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size)
const override;

UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 1; }
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;

bool isCompression() const override { return false; }
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }

private:
const UInt8 delta_bytes_size;
const UInt8 bytes_size;
};

} // namespace DB
Loading

0 comments on commit 2251037

Please sign in to comment.