Skip to content

Commit

Permalink
Compression: Refactor to support multiple compression (#8790)
Browse files Browse the repository at this point in the history
ref #8789
  • Loading branch information
Lloyd-Pottiger authored Mar 19, 2024
1 parent f061a58 commit 63e83b3
Show file tree
Hide file tree
Showing 31 changed files with 2,024 additions and 645 deletions.
7 changes: 5 additions & 2 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,11 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY)
endif ()

if (USE_QPL)
target_link_libraries (dbms ${QPL_LIBRARY})
target_include_directories (dbms PRIVATE ${QPL_INCLUDE_DIR})
target_link_libraries (tiflash_common_io PUBLIC ${QPL_LIBRARY})
target_include_directories (dbms BEFORE PRIVATE ${QPL_INCLUDE_DIR})
set_source_files_properties (
src/IO/Compression/CompressionCodecDeflateQpl.cpp
PROPERTIES COMPILE_FLAGS "-mwaitpkg")
endif ()

target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <DataTypes/DataTypeNullable.h>
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <IO/Compression/CompressionFactory.h>
#include <IO/Compression/CompressionInfo.h>

namespace DB
{
Expand Down Expand Up @@ -554,7 +556,9 @@ CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::string_view str,
assert(compression_method != CompressionMethod::NONE);

String compressed_buffer;
size_t compressed_size = CompressionEncode(str, CompressionSettings(compression_method), compressed_buffer);
auto codec = CompressionFactory::create(CompressionSettings(compression_method));
compressed_buffer.resize(codec->getCompressedReserveSize(str.size()));
size_t compressed_size = codec->compress(str.data(), str.size(), compressed_buffer.data());
compressed_buffer.resize(compressed_size);
return compressed_buffer;
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <IO/Compression/CompressedReadBuffer.h>
#include <IO/Compression/CompressedStream.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <IO/Compression/CompressionMethod.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/ChunkDecodeAndSquash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <Flash/Coprocessor/ChunkDecodeAndSquash.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <IO/Compression/CompressionInfo.h>

namespace DB
{
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Coprocessor/tests/gtest_block_chunk_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <TestUtils/ColumnGenerator.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <gtest/gtest.h>


Expand Down Expand Up @@ -69,6 +70,7 @@ void test_enocde_release_data(VecCol && batch_columns, const Block & header, con
}

TEST(CHBlockChunkCodec, ChunkCodecV1)
try
{
size_t block_num = 10;
size_t rows = 10;
Expand Down Expand Up @@ -217,14 +219,15 @@ TEST(CHBlockChunkCodec, ChunkCodecV1)
ASSERT_FALSE(source_str.empty());
ASSERT_EQ(static_cast<CompressionMethodByte>(source_str[0]), CompressionMethodByte::NONE);

for (auto mode : {CompressionMethod::LZ4, CompressionMethod::ZSTD})
for (const auto method : {CompressionMethod::LZ4, CompressionMethod::ZSTD})
{
auto compressed_str_a = CHBlockChunkCodecV1::encode({&source_str[1], source_str.size() - 1}, mode);
auto compressed_str_b = CHBlockChunkCodecV1{header}.encode(blocks.front(), mode);
auto compressed_str_a = CHBlockChunkCodecV1::encode({&source_str[1], source_str.size() - 1}, method);
auto compressed_str_b = CHBlockChunkCodecV1{header}.encode(blocks.front(), method);

ASSERT_EQ(compressed_str_a, compressed_str_b);
}
}
}
CATCH

} // namespace DB::tests
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/MPPTunnelSetHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Flash/Coprocessor/CHBlockChunkCodec.h>
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <Flash/Mpp/MPPTunnelSetHelper.h>
#include <IO/Compression/CompressionInfo.h>

namespace DB::MPPTunnelSetHelper
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/MPPTunnelSetWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#pragma once

#include <Flash/Mpp/MPPTunnelSet.h>
#include <IO/Compression/CompressedStream.h>
#include <IO/Compression/CompressionMethod.h>

namespace DB
{
Expand Down
275 changes: 0 additions & 275 deletions dbms/src/IO/Compression/CodecDeflateQpl.cpp

This file was deleted.

Loading

0 comments on commit 63e83b3

Please sign in to comment.