Skip to content

Commit

Permalink
ci
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Nov 13, 2024
1 parent e591694 commit a59e61a
Show file tree
Hide file tree
Showing 14 changed files with 550 additions and 28 deletions.
153 changes: 153 additions & 0 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,157 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
}

namespace
{

// Returns <offsets_stream, chars_stream>.
template <typename B, typename G>
std::pair<B *, B *> getStream(const G & getter, IDataType::SubstreamPath & path)
{
auto * chars_stream = getter(path);
path.emplace_back(IDataType::Substream::StringSizes);
auto * offsets_stream = getter(path);
return {offsets_stream, chars_stream};
}

PaddedPODArray<UInt64> offsetToStrSize(const ColumnString::Offsets & chars_offsets, size_t begin, size_t end)
{
PaddedPODArray<UInt64> str_sizes(end - begin);
auto last_chars_offset = begin == 0 ? 0 : chars_offsets[begin - 1];
for (auto i = begin; i < end; ++i)
{
str_sizes[i - begin] = chars_offsets[i] - last_chars_offset;
last_chars_offset = chars_offsets[i];
}
return str_sizes;
}

void strSizeToOffset(const PaddedPODArray<UInt64> & str_sizes, ColumnString::Offsets & chars_offsets)
{
auto initial_size = chars_offsets.size();
chars_offsets.resize(initial_size + str_sizes.size());
auto last_offset = initial_size == 0 ? 0 : chars_offsets[initial_size - 1];
for (size_t i = 0; i < str_sizes.size(); ++i)
{
chars_offsets[initial_size + i] = str_sizes[i] + last_offset;
last_offset = chars_offsets[initial_size + i];
}
}

std::pair<size_t, size_t> serializeOffsetsBinary(
const ColumnString::Offsets & chars_offsets,
WriteBuffer & ostr,
size_t offset,
size_t limit)
{
// [begin, end) is the range that need to be serialized of `chars_offsets`.
const auto begin = offset;
const auto end = limit != 0 && offset + limit < chars_offsets.size() ? offset + limit : chars_offsets.size();

PaddedPODArray<UInt64> sizes = offsetToStrSize(chars_offsets, begin, end);
ostr.write(reinterpret_cast<const char *>(sizes.data()), sizeof(UInt64) * sizes.size());

// [chars_begin, chars_end) is the range that need to be serialized of `chars`.
const auto chars_begin = begin == 0 ? 0 : chars_offsets[begin - 1];
const auto chars_end = chars_offsets[end - 1];
return {chars_begin, chars_end};
}

void serializeCharsBinary(const ColumnString::Chars_t & chars, WriteBuffer & ostr, size_t begin, size_t end)
{
ostr.write(reinterpret_cast<const char *>(&chars[begin]), end - begin);
}

size_t deserializeOffsetsBinary(ColumnString::Offsets & chars_offsets, ReadBuffer & istr, size_t limit)
{
PaddedPODArray<UInt64> str_sizes(limit);
const auto size = istr.readBig(reinterpret_cast<char *>(str_sizes.data()), sizeof(UInt64) * limit);
str_sizes.resize(size / sizeof(UInt64));
strSizeToOffset(str_sizes, chars_offsets);
return std::accumulate(str_sizes.begin(), str_sizes.end(), 0uz);
}

void deserializeCharsBinary(ColumnString::Chars_t & chars, ReadBuffer & istr, size_t bytes)
{
const auto initial_size = chars.size();
chars.resize(initial_size + bytes);
istr.readStrict(reinterpret_cast<char *>(&chars[initial_size]), bytes);
}

void serializeBinaryBulkV2(
const IColumn & column,
WriteBuffer & offsets_stream,
WriteBuffer & chars_stream,
size_t offset,
size_t limit)
{
if (column.size() == 0)
return;
const auto & column_string = typeid_cast<const ColumnString &>(column);
const auto & chars = column_string.getChars();
const auto & offsets = column_string.getOffsets();
auto [chars_begin, chars_end] = serializeOffsetsBinary(offsets, offsets_stream, offset, limit);
serializeCharsBinary(chars, chars_stream, chars_begin, chars_end);
}

void deserializeBinaryBulkV2(IColumn & column, ReadBuffer & offsets_stream, ReadBuffer & chars_stream, size_t limit)
{
auto & column_string = typeid_cast<ColumnString &>(column);
auto & chars = column_string.getChars();
auto & offsets = column_string.getOffsets();
auto bytes = deserializeOffsetsBinary(offsets, offsets_stream, limit);
deserializeCharsBinary(chars, chars_stream, bytes);
}

} // namespace

void DataTypeString::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
if (version == 1)
{
path.emplace_back(Substream::StringSizes);
callback(path);
}
}

void DataTypeString::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (version == 1)
{
auto [offsets_stream, chars_stream] = getStream<WriteBuffer, IDataType::OutputStreamGetter>(getter, path);
serializeBinaryBulkV2(column, *offsets_stream, *chars_stream, offset, limit);
}
else
{
serializeBinaryBulk(column, *getter(path), offset, limit);
}
}

void DataTypeString::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (version == 1)
{
auto [offsets_stream, chars_stream] = getStream<ReadBuffer, IDataType::InputStreamGetter>(getter, path);
deserializeBinaryBulkV2(column, *offsets_stream, *chars_stream, limit);
}
else
{
deserializeBinaryBulk(column, *getter(path), limit, avg_value_size_hint);
}
}


} // namespace DB
26 changes: 26 additions & 0 deletions dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,32 @@ class DataTypeString final : public IDataType
bool isString() const override { return true; }
bool isCategorial() const override { return true; }
bool canBeInsideNullable() const override { return true; }

void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;

void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath & path) const override;

void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;

DataTypeString(Int32 version_ = 1)
: version(version_)
{}
Int32 getVersion() const { return version; }

private:
const Int32 version;
};

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ bool IDataType::isArraySizes(const SubstreamPath & path)
return false;
}

bool IDataType::isStringSizes(const SubstreamPath & path)
{
return std::any_of(path.cbegin(), path.cend(), [](const auto & elem) {
return elem.type == IDataType::Substream::StringSizes;
});
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand All @@ -127,6 +134,8 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
/// and name is encoded as a whole.
stream_name += "%2E" + escapeForFileName(elem.tuple_element_name);
}
else if (elem.type == Substream::StringSizes)
stream_name += ".size";
}
return stream_name;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class IDataType : private boost::noncopyable
NullMap,

TupleElement,

StringSizes,
};
Type type;

Expand Down Expand Up @@ -421,6 +423,7 @@ class IDataType : private boost::noncopyable

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
static bool isStringSizes(const SubstreamPath & path);
};


Expand Down
152 changes: 152 additions & 0 deletions dbms/src/DataTypes/tests/bench_data_type_string.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#include <DataTypes/DataTypeString.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <IO/Buffer/WriteBufferFromString.h>
#include <Poco/UUIDGenerator.h>
#include <TestUtils/FunctionTestUtils.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <benchmark/benchmark.h>

namespace DB::bench
{

String getStreamName(const String & column_name, const IDataType::SubstreamPath & substream_path)
{
return IDataType::getFileNameForStream(column_name, substream_path);
}

String randomString(size_t str_size)
{
auto s = Poco::UUIDGenerator().createRandom().toString();
while (s.size() < str_size)
{
s += Poco::UUIDGenerator().createRandom().toString();
}
return s.substr(str_size);
}

ColumnPtr createColumnString(size_t str_size, size_t count)
{
auto gen = Poco::UUIDGenerator();
std::vector<String> v(count);
for (size_t i = 0; i < v.size(); ++i)
{
v[i] = randomString(str_size);
}
return DB::tests::createColumn<String>(v, "", 0).column;
}

auto initWriteStream(IDataType & type)
{
std::unordered_map<String, std::unique_ptr<WriteBufferFromOwnString>> write_streams;
auto create_write_stream = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = getStreamName("bench", substream_path);
write_streams.emplace(stream_name, std::make_unique<WriteBufferFromOwnString>(100 * 1024 * 1024));
};
type.enumerateStreams(create_write_stream, {});
return write_streams;
}

constexpr size_t str_count = 65535;

template <typename... Args>
void serialize(benchmark::State & state, Args &&... args)
{
auto [version, str_size] = std::make_tuple(std::move(args)...);
auto str_col = createColumnString(str_size, str_count);
DataTypeString t(version);
IDataType & type = t;
auto write_streams = initWriteStream(type);
auto get_write_stream = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = getStreamName("bench", substream_path);
auto * write_stream = write_streams.at(stream_name).get();
write_stream->restart(); // Reset to avoid write buffer overflow.
return write_stream;
};
for (auto _ : state)
{
type.serializeBinaryBulkWithMultipleStreams(*str_col, get_write_stream, 0, str_col->size(), true, {});
}
}

template <typename... Args>
void deserialize(benchmark::State & state, Args &&... args)
{
auto [version, str_size] = std::make_tuple(std::move(args)...);
auto str_col = createColumnString(str_size, str_count);
DataTypeString t(version);
IDataType & type = t;
auto write_streams = initWriteStream(type);
auto get_write_stream = [&](const IDataType::SubstreamPath & substream_path) {
const auto stream_name = getStreamName("bench", substream_path);
auto * write_stream = write_streams.at(stream_name).get();
return write_stream;
};
type.serializeBinaryBulkWithMultipleStreams(*str_col, get_write_stream, 0, str_col->size(), true, {});

std::unordered_map<String, std::unique_ptr<ReadBuffer>> read_streams;
auto get_read_stream = [&](const IDataType::SubstreamPath & substream_path) {
auto * write_stream = get_write_stream(substream_path);
const auto stream_name = getStreamName("bench", substream_path);
read_streams[stream_name] = std::make_unique<ReadBufferFromString>(write_stream->stringRef().toStringView());
return read_streams[stream_name].get();
};
for (auto _ : state)
{
auto col = type.createColumn();
type.deserializeBinaryBulkWithMultipleStreams(*col, get_read_stream, 65535, 8, true, {});
benchmark::DoNotOptimize(col);
}
}

BENCHMARK_CAPTURE(serialize, v0_size8, 0, 8);
BENCHMARK_CAPTURE(serialize, v0_size16, 0, 16);
BENCHMARK_CAPTURE(serialize, v0_size32, 0, 32);
BENCHMARK_CAPTURE(serialize, v0_size64, 0, 64);
BENCHMARK_CAPTURE(serialize, v0_size128, 0, 128);
BENCHMARK_CAPTURE(serialize, v0_size256, 0, 256);
BENCHMARK_CAPTURE(serialize, v0_size512, 0, 512);
BENCHMARK_CAPTURE(serialize, v0_size1024, 0, 1024);

BENCHMARK_CAPTURE(serialize, v1_size8, 1, 8);
BENCHMARK_CAPTURE(serialize, v1_size16, 1, 16);
BENCHMARK_CAPTURE(serialize, v1_size32, 1, 32);
BENCHMARK_CAPTURE(serialize, v1_size64, 1, 64);
BENCHMARK_CAPTURE(serialize, v1_size128, 1, 128);
BENCHMARK_CAPTURE(serialize, v1_size256, 1, 256);
BENCHMARK_CAPTURE(serialize, v1_size512, 1, 512);
BENCHMARK_CAPTURE(serialize, v1_size1024, 1, 1024);

BENCHMARK_CAPTURE(deserialize, v0_size8, 0, 8);
BENCHMARK_CAPTURE(deserialize, v0_size16, 0, 16);
BENCHMARK_CAPTURE(deserialize, v0_size32, 0, 32);
BENCHMARK_CAPTURE(deserialize, v0_size64, 0, 64);
BENCHMARK_CAPTURE(deserialize, v0_size128, 0, 128);
BENCHMARK_CAPTURE(deserialize, v0_size256, 0, 256);
BENCHMARK_CAPTURE(deserialize, v0_size512, 0, 512);
BENCHMARK_CAPTURE(deserialize, v0_size1024, 0, 1024);

BENCHMARK_CAPTURE(deserialize, v1_size8, 1, 8);
BENCHMARK_CAPTURE(deserialize, v1_size16, 1, 16);
BENCHMARK_CAPTURE(deserialize, v1_size32, 1, 32);
BENCHMARK_CAPTURE(deserialize, v1_size64, 1, 64);
BENCHMARK_CAPTURE(deserialize, v1_size128, 1, 128);
BENCHMARK_CAPTURE(deserialize, v1_size256, 1, 256);
BENCHMARK_CAPTURE(deserialize, v1_size512, 1, 512);
BENCHMARK_CAPTURE(deserialize, v1_size1024, 1, 1024);

} // namespace DB::bench
Loading

0 comments on commit a59e61a

Please sign in to comment.