diff --git a/dbms/src/DataTypes/DataTypeString.cpp b/dbms/src/DataTypes/DataTypeString.cpp index 90f8e7f80ba..8ed95502f20 100644 --- a/dbms/src/DataTypes/DataTypeString.cpp +++ b/dbms/src/DataTypes/DataTypeString.cpp @@ -310,4 +310,181 @@ void registerDataTypeString(DataTypeFactory & factory) factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive); } +namespace +{ + +using Offset = ColumnString::Offset; + +// Returns . +template +std::pair getStream(const G & getter, IDataType::SubstreamPath & path) +{ + auto * chars_stream = getter(path); + path.emplace_back(IDataType::Substream::StringSizes); + auto * offsets_stream = getter(path); + return {offsets_stream, chars_stream}; +} + +PaddedPODArray offsetToStrSize( + const ColumnString::Offsets & chars_offsets, + const size_t begin, + const size_t end) +{ + assert(!chars_offsets.empty()); + PaddedPODArray str_sizes(end - begin); + size_t i = 0; + if (begin == 0) + { + str_sizes[0] = chars_offsets[0]; + ++i; + } + assert(begin + i > 0); + // clang-format off + #pragma clang loop vectorize(enable) + // clang-format on + for (; i < str_sizes.size(); ++i) + { + str_sizes[i] = chars_offsets[begin + i] - chars_offsets[begin + i - 1]; + } + return str_sizes; +} + +void strSizeToOffset(const PaddedPODArray & str_sizes, ColumnString::Offsets & chars_offsets) +{ + assert(!str_sizes.empty()); + const auto initial_size = chars_offsets.size(); + chars_offsets.resize(initial_size + str_sizes.size()); + size_t i = 0; + if (initial_size == 0) + { + chars_offsets[i] = str_sizes[0]; + ++i; + } + assert(initial_size + i > 0); + // Cannot be vectorize by compiler because chars_offsets[i] depends on chars_offsets[i-1] + // #pragma clang loop vectorize(enable) + for (; i < str_sizes.size(); ++i) + { + chars_offsets[i + initial_size] = str_sizes[i] + chars_offsets[i + initial_size - 1]; + } +} + +std::pair serializeOffsetsBinary( + const ColumnString::Offsets & chars_offsets, + WriteBuffer & ostr, + size_t offset, + size_t limit) +{ + // [begin, end) is the range that need to be serialized of `chars_offsets`. + const auto begin = offset; + const auto end = limit != 0 && offset + limit < chars_offsets.size() ? offset + limit : chars_offsets.size(); + + PaddedPODArray sizes = offsetToStrSize(chars_offsets, begin, end); + ostr.write(reinterpret_cast(sizes.data()), sizeof(Offset) * sizes.size()); + + // [chars_begin, chars_end) is the range that need to be serialized of `chars`. + const auto chars_begin = begin == 0 ? 0 : chars_offsets[begin - 1]; + const auto chars_end = chars_offsets[end - 1]; + return {chars_begin, chars_end}; +} + +void serializeCharsBinary(const ColumnString::Chars_t & chars, WriteBuffer & ostr, size_t begin, size_t end) +{ + ostr.write(reinterpret_cast(&chars[begin]), end - begin); +} + +size_t deserializeOffsetsBinary(ColumnString::Offsets & chars_offsets, ReadBuffer & istr, size_t limit) +{ + PaddedPODArray str_sizes(limit); + const auto size = istr.readBig(reinterpret_cast(str_sizes.data()), sizeof(Offset) * limit); + str_sizes.resize(size / sizeof(Offset)); + strSizeToOffset(str_sizes, chars_offsets); + return std::accumulate(str_sizes.begin(), str_sizes.end(), 0uz); +} + +void deserializeCharsBinary(ColumnString::Chars_t & chars, ReadBuffer & istr, size_t bytes) +{ + const auto initial_size = chars.size(); + chars.resize(initial_size + bytes); + istr.readStrict(reinterpret_cast(&chars[initial_size]), bytes); +} + +void serializeBinaryBulkV2( + const IColumn & column, + WriteBuffer & offsets_stream, + WriteBuffer & chars_stream, + size_t offset, + size_t limit) +{ + if (column.empty()) + return; + const auto & column_string = typeid_cast(column); + const auto & chars = column_string.getChars(); + const auto & offsets = column_string.getOffsets(); + auto [chars_begin, chars_end] = serializeOffsetsBinary(offsets, offsets_stream, offset, limit); + serializeCharsBinary(chars, chars_stream, chars_begin, chars_end); +} + +void deserializeBinaryBulkV2(IColumn & column, ReadBuffer & offsets_stream, ReadBuffer & chars_stream, size_t limit) +{ + if (limit == 0) + return; + auto & column_string = typeid_cast(column); + auto & chars = column_string.getChars(); + auto & offsets = column_string.getOffsets(); + auto bytes = deserializeOffsetsBinary(offsets, offsets_stream, limit); + deserializeCharsBinary(chars, chars_stream, bytes); +} + +} // namespace + +void DataTypeString::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const +{ + callback(path); + if (version == 1) + { + path.emplace_back(Substream::StringSizes); + callback(path); + } +} + +void DataTypeString::serializeBinaryBulkWithMultipleStreams( + const IColumn & column, + const OutputStreamGetter & getter, + size_t offset, + size_t limit, + bool /*position_independent_encoding*/, + SubstreamPath & path) const +{ + if (version == 1) + { + auto [offsets_stream, chars_stream] = getStream(getter, path); + serializeBinaryBulkV2(column, *offsets_stream, *chars_stream, offset, limit); + } + else + { + serializeBinaryBulk(column, *getter(path), offset, limit); + } +} + +void DataTypeString::deserializeBinaryBulkWithMultipleStreams( + IColumn & column, + const InputStreamGetter & getter, + size_t limit, + double avg_value_size_hint, + bool /*position_independent_encoding*/, + SubstreamPath & path) const +{ + if (version == 1) + { + auto [offsets_stream, chars_stream] = getStream(getter, path); + deserializeBinaryBulkV2(column, *offsets_stream, *chars_stream, limit); + } + else + { + deserializeBinaryBulk(column, *getter(path), limit, avg_value_size_hint); + } +} + + } // namespace DB diff --git a/dbms/src/DataTypes/DataTypeString.h b/dbms/src/DataTypes/DataTypeString.h index 1bc4ece42dd..ddb5fc7af0e 100644 --- a/dbms/src/DataTypes/DataTypeString.h +++ b/dbms/src/DataTypes/DataTypeString.h @@ -64,6 +64,37 @@ class DataTypeString final : public IDataType bool isString() const override { return true; } bool isCategorial() const override { return true; } bool canBeInsideNullable() const override { return true; } + + void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override; + + void serializeBinaryBulkWithMultipleStreams( + const IColumn & column, + const OutputStreamGetter & getter, + size_t offset, + size_t limit, + bool position_independent_encoding, + SubstreamPath & path) const override; + + void deserializeBinaryBulkWithMultipleStreams( + IColumn & column, + const InputStreamGetter & getter, + size_t limit, + double avg_value_size_hint, + bool position_independent_encoding, + SubstreamPath & path) const override; + + DataTypeString(Int32 version_ = 1) + : version(version_) + { + assert(version == 0 || version == 1); + } + Int32 getVersion() const { return version; } + +private: + // `version` == 0, use size-prefix format in serialization/deserialization. + // `version` == 1, seperate sizes and chars in serialization/deserialization. + // Default value is 1, 0 is use to read old data. + const Int32 version = 1; }; } // namespace DB diff --git a/dbms/src/DataTypes/IDataType.cpp b/dbms/src/DataTypes/IDataType.cpp index b131adca00a..fafb9f87382 100644 --- a/dbms/src/DataTypes/IDataType.cpp +++ b/dbms/src/DataTypes/IDataType.cpp @@ -103,6 +103,13 @@ bool IDataType::isArraySizes(const SubstreamPath & path) return false; } +bool IDataType::isStringSizes(const SubstreamPath & path) +{ + return std::any_of(path.cbegin(), path.cend(), [](const auto & elem) { + return elem.type == IDataType::Substream::StringSizes; + }); +} + String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path) { String nested_table_name = Nested::extractTableName(column_name); @@ -127,6 +134,8 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy /// and name is encoded as a whole. stream_name += "%2E" + escapeForFileName(elem.tuple_element_name); } + else if (elem.type == Substream::StringSizes) + stream_name += ".size"; } return stream_name; } diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index b9540aee8f4..a5d4dc88e73 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -95,6 +95,8 @@ class IDataType : private boost::noncopyable NullMap, TupleElement, + + StringSizes, }; Type type; @@ -421,6 +423,7 @@ class IDataType : private boost::noncopyable static bool isNullMap(const SubstreamPath & path); static bool isArraySizes(const SubstreamPath & path); + static bool isStringSizes(const SubstreamPath & path); }; diff --git a/dbms/src/DataTypes/tests/bench_data_type_string.cpp b/dbms/src/DataTypes/tests/bench_data_type_string.cpp new file mode 100644 index 00000000000..7a0a86fd211 --- /dev/null +++ b/dbms/src/DataTypes/tests/bench_data_type_string.cpp @@ -0,0 +1,278 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +namespace DB::bench +{ + +String getStreamName(const String & column_name, const IDataType::SubstreamPath & substream_path) +{ + return IDataType::getFileNameForStream(column_name, substream_path); +} + +ColumnPtr createColumnString(size_t str_size, size_t count) +{ + std::random_device rand_dev; + std::mt19937_64 rand_gen(rand_dev()); + std::uniform_int_distribution rand_size(str_size * 0.8, str_size * 1.2); + std::vector v(count); + for (auto & s : v) + s = DB::random::randomString(rand_size(rand_gen)); + + return DB::tests::createColumn(v, "", 0).column; +} + +using WriteBufferPair = std::pair, std::unique_ptr>; +WriteBufferPair createWriteBuffer(const String & stream_name, CompressionMethod method) +{ + auto write_buffer = std::make_unique(100 * 1024 * 1024); + std::unique_ptr compressed_buf; + if (method != CompressionMethod::NONE) + { + CompressionSetting setting{method}; + setting.data_type = stream_name.ends_with(".size") ? CompressionDataType::Int64 : CompressionDataType::String; + compressed_buf = CompressedWriteBuffer<>::build(*write_buffer, CompressionSettings{setting}, false); + } + return {std::move(compressed_buf), std::move(write_buffer)}; +} + +using ReadBufferPair = std::pair, std::unique_ptr>; +ReadBufferPair createReadBuffer(const WriteBufferFromOwnString & write_buffer, bool enable_compression) +{ + auto read_buffer = std::make_unique(write_buffer.stringRef().toStringView()); + std::unique_ptr compressed_buf; + if (enable_compression) + compressed_buf = std::make_unique>(*read_buffer); + return {std::move(compressed_buf), std::move(read_buffer)}; +} + +auto initWriteStream(IDataType & type, CompressionMethod method) +{ + std::unordered_map write_streams; + auto create_write_stream = [&](const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName("bench", substream_path); + write_streams.emplace(stream_name, createWriteBuffer(stream_name, method)); + }; + type.enumerateStreams(create_write_stream, {}); + return write_streams; +} + +constexpr size_t str_count = 65535; + +template +void serialize(benchmark::State & state, Args &&... args) +{ + auto [version, str_size, method] = std::make_tuple(std::move(args)...); + auto str_col = createColumnString(str_size, str_count); + DataTypeString t(version); + IDataType & type = t; + auto write_streams = initWriteStream(type, method); + auto get_write_stream = [&](const IDataType::SubstreamPath & substream_path) -> WriteBuffer * { + const auto stream_name = getStreamName("bench", substream_path); + auto & [compress_buf, write_buffer] = write_streams.at(stream_name); + write_buffer->restart(); // Reset to avoid write buffer overflow. + if (compress_buf) + return compress_buf.get(); + return write_buffer.get(); + }; + auto flush_stream = [&](const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName("bench", substream_path); + auto & [compress_buf, write_buffer] = write_streams.at(stream_name); + if (compress_buf) + compress_buf->next(); + }; + for (auto _ : state) + { + type.serializeBinaryBulkWithMultipleStreams(*str_col, get_write_stream, 0, str_col->size(), true, {}); + type.enumerateStreams(flush_stream, {}); + } +} + +template +void deserialize(benchmark::State & state, Args &&... args) +{ + auto [version, str_size, method] = std::make_tuple(std::move(args)...); + auto str_col = createColumnString(str_size, str_count); + DataTypeString t(version); + IDataType & type = t; + auto write_streams = initWriteStream(type, method); + auto get_write_stream = [&](const IDataType::SubstreamPath & substream_path) -> WriteBuffer * { + const auto stream_name = getStreamName("bench", substream_path); + auto & [compress_buf, write_buffer] = write_streams.at(stream_name); + if (compress_buf) + return compress_buf.get(); + return write_buffer.get(); + }; + auto flush_stream = [&](const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName("bench", substream_path); + auto & [compress_buf, write_buffer] = write_streams.at(stream_name); + if (compress_buf) + compress_buf->next(); + }; + type.serializeBinaryBulkWithMultipleStreams(*str_col, get_write_stream, 0, str_col->size(), true, {}); + type.enumerateStreams(flush_stream, {}); + + std::unordered_map read_streams; + auto get_read_stream = [&](const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName("bench", substream_path); + auto & [compress_buf, write_buffer] = write_streams.at(stream_name); + read_streams[stream_name] = createReadBuffer(*write_buffer, compress_buf != nullptr); + auto & [compressed_read_buffer, read_buffer] = read_streams[stream_name]; + if (compressed_read_buffer) + return compressed_read_buffer.get(); + return read_buffer.get(); + }; + for (auto _ : state) + { + auto col = type.createColumn(); + type.deserializeBinaryBulkWithMultipleStreams(*col, get_read_stream, str_count, str_size, true, {}); + benchmark::DoNotOptimize(col); + } +} + +BENCHMARK_CAPTURE(serialize, v0_size1_none, 0, 1, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size2_none, 0, 2, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size4_none, 0, 4, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size8_none, 0, 8, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size16_none, 0, 16, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size32_none, 0, 32, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size64_none, 0, 64, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size128_none, 0, 128, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size256_none, 0, 256, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size512_none, 0, 512, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v0_size1024_none, 0, 1024, CompressionMethod::NONE); + +BENCHMARK_CAPTURE(serialize, v1_size1_none, 1, 1, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size2_none, 1, 2, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size4_none, 1, 4, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size8_none, 1, 8, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size16_none, 1, 16, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size32_none, 1, 32, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size64_none, 1, 64, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size128_none, 1, 128, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size256_none, 1, 256, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size512_none, 1, 512, CompressionMethod::NONE); +BENCHMARK_CAPTURE(serialize, v1_size1024_none, 1, 1024, CompressionMethod::NONE); + +BENCHMARK_CAPTURE(deserialize, v0_size1_none, 0, 1, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size2_none, 0, 2, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size4_none, 0, 4, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size8_none, 0, 8, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size16_none, 0, 16, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size32_none, 0, 32, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size64_none, 0, 64, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size128_none, 0, 128, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size256_none, 0, 256, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size512_none, 0, 512, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v0_size1024_none, 0, 1024, CompressionMethod::NONE); + +BENCHMARK_CAPTURE(deserialize, v1_size1_none, 1, 1, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size2_none, 1, 2, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size4_none, 1, 4, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size8_none, 1, 8, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size16_none, 1, 16, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size32_none, 1, 32, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size64_none, 1, 64, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size128_none, 1, 128, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size256_none, 1, 256, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size512_none, 1, 512, CompressionMethod::NONE); +BENCHMARK_CAPTURE(deserialize, v1_size1024_none, 1, 1024, CompressionMethod::NONE); + +BENCHMARK_CAPTURE(serialize, v0_size1_lz4, 0, 1, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size2_lz4, 0, 2, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size4_lz4, 0, 4, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size8_lz4, 0, 8, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size16_lz4, 0, 16, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size32_lz4, 0, 32, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size64_lz4, 0, 64, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size128_lz4, 0, 128, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size256_lz4, 0, 256, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size512_lz4, 0, 512, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v0_size1024_lz4, 0, 1024, CompressionMethod::LZ4); + +BENCHMARK_CAPTURE(serialize, v1_size1_lz4, 1, 1, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size2_lz4, 1, 2, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size4_lz4, 1, 4, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size8_lz4, 1, 8, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size16_lz4, 1, 16, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size32_lz4, 1, 32, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size64_lz4, 1, 64, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size128_lz4, 1, 128, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size256_lz4, 1, 256, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size512_lz4, 1, 512, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(serialize, v1_size1024_lz4, 1, 1024, CompressionMethod::LZ4); + +BENCHMARK_CAPTURE(deserialize, v0_size1_lz4, 0, 1, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size2_lz4, 0, 2, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size4_lz4, 0, 4, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size8_lz4, 0, 8, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size16_lz4, 0, 16, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size32_lz4, 0, 32, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size64_lz4, 0, 64, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size128_lz4, 0, 128, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size256_lz4, 0, 256, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size512_lz4, 0, 512, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v0_size1024_lz4, 0, 1024, CompressionMethod::LZ4); + +BENCHMARK_CAPTURE(deserialize, v1_size1_lz4, 1, 1, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size2_lz4, 1, 2, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size4_lz4, 1, 4, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size8_lz4, 1, 8, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size16_lz4, 1, 16, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size32_lz4, 1, 32, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size64_lz4, 1, 64, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size128_lz4, 1, 128, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size256_lz4, 1, 256, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size512_lz4, 1, 512, CompressionMethod::LZ4); +BENCHMARK_CAPTURE(deserialize, v1_size1024_lz4, 1, 1024, CompressionMethod::LZ4); + +BENCHMARK_CAPTURE(serialize, v1_size1_lw, 1, 1, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size2_lw, 1, 2, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size4_lw, 1, 4, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size8_lw, 1, 8, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size16_lw, 1, 16, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size32_lw, 1, 32, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size64_lw, 1, 64, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size128_lw, 1, 128, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size256_lw, 1, 256, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size512_lw, 1, 512, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(serialize, v1_size1024_lw, 1, 1024, CompressionMethod::Lightweight); + +BENCHMARK_CAPTURE(deserialize, v1_size1_lw, 1, 1, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size2_lw, 1, 2, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size4_lw, 1, 4, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size8_lw, 1, 8, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size16_lw, 1, 16, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size32_lw, 1, 32, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size64_lw, 1, 64, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size128_lw, 1, 128, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size256_lw, 1, 256, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size512_lw, 1, 512, CompressionMethod::Lightweight); +BENCHMARK_CAPTURE(deserialize, v1_size1024_lw, 1, 1024, CompressionMethod::Lightweight); +} // namespace DB::bench diff --git a/dbms/src/DataTypes/tests/gtest_data_type_string.cpp b/dbms/src/DataTypes/tests/gtest_data_type_string.cpp new file mode 100644 index 00000000000..b6fa8092fd0 --- /dev/null +++ b/dbms/src/DataTypes/tests/gtest_data_type_string.cpp @@ -0,0 +1,169 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +namespace DB::tests +{ +class DataTypeStringTest : public ::testing::Test +{ +public: + void SetUp() override {} + + void TearDown() override {} + +protected: + static String getStreamName(const String & column_name, const IDataType::SubstreamPath & substream_path) + { + return IDataType::getFileNameForStream(column_name, substream_path); + } + + void initWriteStream() + { + auto create_write_stream = [&](const String & column_name, const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName(column_name, substream_path); + write_streams.emplace(stream_name, std::make_unique(10 * 1024 * 1024)); + }; + auto create_write_stream1 = [&](const IDataType::SubstreamPath & substream_path) { + create_write_stream("1", substream_path); + }; + auto create_write_stream2 = [&](const IDataType::SubstreamPath & substream_path) { + create_write_stream("2", substream_path); + }; + str_v0.enumerateStreams(create_write_stream1, {}); + str_v1.enumerateStreams(create_write_stream2, {}); + } + + void initReadStream() + { + auto create_read_stream = [&](const String & column_name, const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName(column_name, substream_path); + auto s = write_streams.at(stream_name)->stringRef().toStringView(); + read_streams.emplace(stream_name, std::make_unique(s)); + }; + auto create_read_stream1 = [&](const IDataType::SubstreamPath & substream_path) { + create_read_stream("1", substream_path); + }; + auto create_read_stream2 = [&](const IDataType::SubstreamPath & substream_path) { + create_read_stream("2", substream_path); + }; + str_v0.enumerateStreams(create_read_stream1, {}); + str_v1.enumerateStreams(create_read_stream2, {}); + } + + void serialize(const IColumn & col, size_t offset, size_t limit) + { + auto get_write_stream = [&](const String & column_name, const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName(column_name, substream_path); + return write_streams.at(stream_name).get(); + }; + auto get_write_stream1 = [&](const IDataType::SubstreamPath & substream_path) { + return get_write_stream("1", substream_path); + }; + auto get_write_stream2 = [&](const IDataType::SubstreamPath & substream_path) { + return get_write_stream("2", substream_path); + }; + str_v0.serializeBinaryBulkWithMultipleStreams(col, get_write_stream1, offset, limit, true, {}); + str_v1.serializeBinaryBulkWithMultipleStreams(col, get_write_stream2, offset, limit, true, {}); + } + + void deserialize(IColumn & col1, IColumn & col2, size_t limit) + { + auto get_read_stream = [&](const String & column_name, const IDataType::SubstreamPath & substream_path) { + const auto stream_name = getStreamName(column_name, substream_path); + return read_streams.at(stream_name).get(); + }; + auto get_read_stream1 = [&](const IDataType::SubstreamPath & substream_path) { + return get_read_stream("1", substream_path); + }; + auto get_read_stream2 = [&](const IDataType::SubstreamPath & substream_path) { + return get_read_stream("2", substream_path); + }; + str_v0.deserializeBinaryBulkWithMultipleStreams(col1, get_read_stream1, limit, 8, true, {}); + str_v1.deserializeBinaryBulkWithMultipleStreams(col2, get_read_stream2, limit, 8, true, {}); + } + + DataTypeString a{0}; + DataTypeString b{1}; + IDataType & str_v0 = a; + IDataType & str_v1 = b; + + std::unordered_map> write_streams; + std::unordered_map> read_streams; +}; + +TEST_F(DataTypeStringTest, BasicSerDe) +try +{ + auto str_col = DB::tests::createColumn(DM::tests::createNumberStrings(0, 65536), "", 0).column; + initWriteStream(); + ASSERT_EQ(write_streams.size(), 3); + serialize(*str_col, 0, str_col->size()); + initReadStream(); + ASSERT_EQ(read_streams.size(), 3); + auto col1 = str_v0.createColumn(); + auto col2 = str_v1.createColumn(); + deserialize(*col1, *col2, str_col->size()); + + ASSERT_EQ(col1->size(), str_col->size()); + ASSERT_EQ(col2->size(), str_col->size()); + for (size_t i = 0; i < col2->size(); ++i) + { + ASSERT_EQ(col1->getDataAt(i).toStringView(), str_col->getDataAt(i).toStringView()); + ASSERT_EQ(col2->getDataAt(i).toStringView(), str_col->getDataAt(i).toStringView()); + } +} +CATCH + +TEST_F(DataTypeStringTest, Concat) +try +{ + auto str_col = DB::tests::createColumn(DM::tests::createNumberStrings(0, 65536), "", 0).column; + initWriteStream(); + ASSERT_EQ(write_streams.size(), 3); + serialize(*str_col, 0, 10000); + serialize(*str_col, 10000, 20000); + serialize(*str_col, 30000, 30000); + serialize(*str_col, 60000, 40000); + + initReadStream(); + ASSERT_EQ(read_streams.size(), 3); + auto col1 = str_v0.createColumn(); + auto col2 = str_v1.createColumn(); + deserialize(*col1, *col2, 20000); + ASSERT_EQ(col1->size(), 20000); + ASSERT_EQ(col2->size(), 20000); + deserialize(*col1, *col2, 30000); + ASSERT_EQ(col1->size(), 50000); + ASSERT_EQ(col2->size(), 50000); + deserialize(*col1, *col2, 10000); + ASSERT_EQ(col1->size(), 60000); + ASSERT_EQ(col2->size(), 60000); + deserialize(*col1, *col2, 8000); + + ASSERT_EQ(col1->size(), str_col->size()); + ASSERT_EQ(col2->size(), str_col->size()); + for (size_t i = 0; i < col2->size(); ++i) + { + ASSERT_EQ(col1->getDataAt(i).toStringView(), str_col->getDataAt(i).toStringView()); + ASSERT_EQ(col2->getDataAt(i).toStringView(), str_col->getDataAt(i).toStringView()); + } +} +CATCH + +} // namespace DB::tests diff --git a/dbms/src/IO/Compression/CompressionSettings.h b/dbms/src/IO/Compression/CompressionSettings.h index a70eedff34b..7ba26ecb6e8 100644 --- a/dbms/src/IO/Compression/CompressionSettings.h +++ b/dbms/src/IO/Compression/CompressionSettings.h @@ -60,21 +60,31 @@ struct CompressionSetting : CompressionSetting(CompressionMethod::LZ4) {} - explicit CompressionSetting(CompressionMethod method_) + explicit CompressionSetting( + CompressionMethod method_, + CompressionDataType data_type_ = CompressionDataType::Unknown) : method(method_) , level(getDefaultLevel(method)) + , data_type(data_type_) , method_byte(method_byte_map[static_cast(method_)]) {} - explicit CompressionSetting(CompressionMethodByte method_byte_) + explicit CompressionSetting( + CompressionMethodByte method_byte_, + CompressionDataType data_type_ = CompressionDataType::Unknown) : method(method_map.at(method_byte_)) , level(getDefaultLevel(method)) + , data_type(data_type_) , method_byte(method_byte_) {} - CompressionSetting(CompressionMethod method_, int level_) + CompressionSetting( + CompressionMethod method_, + int level_, + CompressionDataType data_type_ = CompressionDataType::Unknown) : method(method_) , level(level_) + , data_type(data_type_) , method_byte(method_byte_map[static_cast(method_)]) {} diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h index b01f55fdd9c..b0ebb5cfda8 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStat.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStat.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -37,8 +38,8 @@ struct ColumnStat size_t nullmap_data_bytes = 0; size_t nullmap_mark_bytes = 0; size_t index_bytes = 0; - size_t array_sizes_bytes = 0; - size_t array_sizes_mark_bytes = 0; + size_t sizes_bytes = 0; // Array sizes or String sizes, depends on the data type of this column + size_t sizes_mark_bytes = 0; std::vector vector_index; @@ -59,8 +60,8 @@ struct ColumnStat stat.set_nullmap_data_bytes(nullmap_data_bytes); stat.set_nullmap_mark_bytes(nullmap_mark_bytes); stat.set_index_bytes(index_bytes); - stat.set_array_sizes_bytes(array_sizes_bytes); - stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes); + stat.set_sizes_bytes(sizes_bytes); + stat.set_sizes_mark_bytes(sizes_mark_bytes); for (const auto & vec_idx : vector_index) { @@ -86,8 +87,10 @@ struct ColumnStat nullmap_data_bytes = proto.nullmap_data_bytes(); nullmap_mark_bytes = proto.nullmap_mark_bytes(); index_bytes = proto.index_bytes(); - array_sizes_bytes = proto.array_sizes_bytes(); - array_sizes_mark_bytes = proto.array_sizes_mark_bytes(); + sizes_bytes = proto.sizes_bytes(); + sizes_mark_bytes = proto.sizes_mark_bytes(); + + checkOldStringSerializationFormat(); if (proto.has_vector_index()) { @@ -142,6 +145,19 @@ struct ColumnStat readIntBinary(nullmap_mark_bytes, buf); readIntBinary(index_bytes, buf); } + +private: + void checkOldStringSerializationFormat() + { + if (sizes_bytes != 0) + return; + + if (removeNullable(type)->getTypeId() != TypeIndex::String) + return; + + auto t = std::make_shared(0); + type = type->isNullable() ? makeNullable(t) : t; + } }; using ColumnStats = std::unordered_map; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 529c12d952b..db543860e4b 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -243,7 +243,8 @@ size_t DMFile::colDataSize(ColId id, ColDataType type) case ColDataType::NullMap: return itr->second.nullmap_data_bytes; case ColDataType::ArraySizes: - return itr->second.array_sizes_bytes; + case ColDataType::StringSizes: + return itr->second.sizes_bytes; } } else @@ -258,9 +259,11 @@ size_t DMFile::colDataSize(ColId id, ColDataType type) namebase = getFileNameBase(id, {IDataType::Substream::NullMap}); break; case ColDataType::ArraySizes: + case ColDataType::StringSizes: RUNTIME_CHECK_MSG( - type != ColDataType::ArraySizes, - "Can not get array map size by filename, col_id={} path={}", + false, + "Can not get size of {} by filename, col_id={} path={}", + magic_enum::enum_name(type), id, path()); break; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 1767ce8331d..92719572806 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -273,6 +273,7 @@ class DMFile : private boost::noncopyable Elements, NullMap, ArraySizes, + StringSizes, }; size_t colDataSize(ColId id, ColDataType type); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp index ec4ca7c0047..4a4169cbce1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp @@ -418,14 +418,14 @@ UInt64 DMFileMeta::getFileSize(ColId col_id, const String & filename) const { return itr->second.nullmap_mark_bytes; } - // Note that ".size0.dat"/".size0.mrk" must be check before ".dat"/".mrk" - else if (endsWith(filename, ".size0.dat")) + // Note that ".size0.dat"/".size0.mrk"/".size.dat"/".size.mrk" must be check before ".dat"/".mrk" + else if (endsWith(filename, ".size0.dat") || endsWith(filename, ".size.dat")) { - return itr->second.array_sizes_bytes; + return itr->second.sizes_bytes; } - else if (endsWith(filename, ".size0.mrk")) + else if (endsWith(filename, ".size0.mrk") || endsWith(filename, ".size.mrk")) { - return itr->second.array_sizes_mark_bytes; + return itr->second.sizes_mark_bytes; } else if (endsWith(filename, ".dat")) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp index 2e6ca7276c4..a5f3f9810e6 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp @@ -370,11 +370,14 @@ void DMFileMetaV2::finalizeSmallFiles( delete_file_name.emplace_back(std::move(fname)); } - // check .size0.dat - if (stat.array_sizes_bytes > 0 && stat.array_sizes_bytes <= small_file_size_threshold) + // check .size0.dat and .size.dat + if (stat.sizes_bytes > 0 && stat.sizes_bytes <= small_file_size_threshold) { - auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::ArraySizes})); - auto fsize = stat.array_sizes_bytes; + auto substream = removeNullable(stat.type)->getTypeId() == TypeIndex::String + ? IDataType::Substream::StringSizes + : IDataType::Substream::ArraySizes; + auto fname = colDataFileName(getFileNameBase(col_id, {substream})); + auto fsize = stat.sizes_bytes; copy_file_to_cur(fname, fsize); delete_file_name.emplace_back(std::move(fname)); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp index 3186193e13a..78f415206a4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp @@ -204,7 +204,8 @@ size_t DMFileVectorIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_muta const auto & cd = read_columns[col_idx]; // Save index and update column stats auto callback = [&](const IDataType::SubstreamPath & substream_path) -> void { - if (IDataType::isNullMap(substream_path) || IDataType::isArraySizes(substream_path)) + if (IDataType::isNullMap(substream_path) || IDataType::isArraySizes(substream_path) + || IDataType::isStringSizes(substream_path)) return; std::vector new_indexes; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 780567b22ff..fe8a6b3d505 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -113,7 +113,8 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) { auto callback = [&](const IDataType::SubstreamPath & substream_path) { const auto stream_name = DMFile::getFileNameBase(col_id, substream_path); - bool substream_can_index = !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path); + bool substream_can_index = !IDataType::isNullMap(substream_path) && !IDataType::isArraySizes(substream_path) + && !IDataType::isStringSizes(substream_path); auto stream = std::make_unique( dmfile, stream_name, @@ -125,7 +126,6 @@ void DMFileWriter::addStreams(ColId col_id, DataTypePtr type, bool do_index) do_index && substream_can_index); column_streams.emplace(stream_name, std::move(stream)); }; - type->enumerateStreams(callback, {}); } @@ -280,6 +280,7 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) const bool is_null = IDataType::isNullMap(substream); const bool is_array = IDataType::isArraySizes(substream); + const bool is_string_sizes = IDataType::isStringSizes(substream); // v3 if (dmfile->useMetaV2()) @@ -296,9 +297,9 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) { col_stat.nullmap_data_bytes = stream->plain_file->getMaterializedBytes(); } - else if (is_array) + else if (is_array || is_string_sizes) { - col_stat.array_sizes_bytes = stream->plain_file->getMaterializedBytes(); + col_stat.sizes_bytes = stream->plain_file->getMaterializedBytes(); } else { @@ -365,9 +366,9 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type) { col_stat.nullmap_mark_bytes = mark_size; } - else if (is_array) + else if (is_array || is_string_sizes) { - col_stat.array_sizes_mark_bytes = mark_size; + col_stat.sizes_mark_bytes = mark_size; } else { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h index b7836c21321..fa0dce780f7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.h @@ -69,10 +69,7 @@ class DMFileWriter , minmaxes(do_index ? std::make_shared(*type) : nullptr) { assert(compression_settings.settings.size() == 1); - auto setting = CompressionSetting::create<>( - compression_settings.settings[0].method, - compression_settings.settings[0].level, - *type); + auto setting = getCompressionSetting(type, file_base_name, compression_settings.settings[0]); compressed_buf = CompressedWriteBuffer<>::build( *plain_file, CompressionSettings(setting), @@ -97,6 +94,23 @@ class DMFileWriter } } + static bool isStringSizes(const DataTypePtr & type, const String & file_base_name) + { + return removeNullable(type)->getTypeId() == TypeIndex::String && file_base_name.ends_with(".size"); + } + + static CompressionSetting getCompressionSetting( + const DataTypePtr & type, + const String & file_base_name, + const CompressionSetting & setting) + { + // Force use Lightweight compression for string sizes, since the string sizes almost always small. + // Performance of LZ4 to decompress such integers is not very well. + return isStringSizes(type, file_base_name) + ? CompressionSetting{CompressionMethod::Lightweight, CompressionDataType::Int64} + : CompressionSetting::create<>(setting.method, setting.level, *type); + } + // compressed_buf -> plain_file WriteBufferFromFileBasePtr plain_file; WriteBufferPtr compressed_buf; diff --git a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto index 1161ee6c7b4..ca2cf1dbcc3 100644 --- a/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto +++ b/dbms/src/Storages/DeltaMerge/dtpb/dmfile.proto @@ -61,8 +61,8 @@ message ColumnStat { optional uint64 nullmap_data_bytes = 7; optional uint64 nullmap_mark_bytes = 8; optional uint64 index_bytes = 9; - optional uint64 array_sizes_bytes = 10; - optional uint64 array_sizes_mark_bytes = 11; + optional uint64 sizes_bytes = 10; + optional uint64 sizes_mark_bytes = 11; // Only used in tests. Modifying other fields of ColumnStat is hard. optional string additional_data_for_test = 101;