Skip to content

Commit

Permalink
Storages: New serialization/deserialization for DataTypeString
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Nov 26, 2024
1 parent e3e6780 commit d925107
Show file tree
Hide file tree
Showing 16 changed files with 750 additions and 34 deletions.
177 changes: 177 additions & 0 deletions dbms/src/DataTypes/DataTypeString.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,4 +310,181 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerSimpleDataType("LONGBLOB", creator, DataTypeFactory::CaseInsensitive);
}

namespace
{

using Offset = ColumnString::Offset;

// Returns <offsets_stream, chars_stream>.
template <typename B, typename G>
std::pair<B *, B *> getStream(const G & getter, IDataType::SubstreamPath & path)
{
auto * chars_stream = getter(path);
path.emplace_back(IDataType::Substream::StringSizes);
auto * offsets_stream = getter(path);
return {offsets_stream, chars_stream};
}

PaddedPODArray<Offset> offsetToStrSize(
const ColumnString::Offsets & chars_offsets,
const size_t begin,
const size_t end)
{
assert(!chars_offsets.empty());
PaddedPODArray<Offset> str_sizes(end - begin);
size_t i = 0;
if (begin == 0)
{
str_sizes[0] = chars_offsets[0];
++i;
}
assert(begin + i > 0);
// clang-format off
#pragma clang loop vectorize(enable)
// clang-format on
for (; i < str_sizes.size(); ++i)
{
str_sizes[i] = chars_offsets[begin + i] - chars_offsets[begin + i - 1];
}
return str_sizes;
}

void strSizeToOffset(const PaddedPODArray<Offset> & str_sizes, ColumnString::Offsets & chars_offsets)
{
assert(!str_sizes.empty());
const auto initial_size = chars_offsets.size();
chars_offsets.resize(initial_size + str_sizes.size());
size_t i = 0;
if (initial_size == 0)
{
chars_offsets[i] = str_sizes[0];
++i;
}
assert(initial_size + i > 0);
// Cannot be vectorize by compiler because chars_offsets[i] depends on chars_offsets[i-1]
// #pragma clang loop vectorize(enable)
for (; i < str_sizes.size(); ++i)
{
chars_offsets[i + initial_size] = str_sizes[i] + chars_offsets[i + initial_size - 1];
}
}

std::pair<size_t, size_t> serializeOffsetsBinary(
const ColumnString::Offsets & chars_offsets,
WriteBuffer & ostr,
size_t offset,
size_t limit)
{
// [begin, end) is the range that need to be serialized of `chars_offsets`.
const auto begin = offset;
const auto end = limit != 0 && offset + limit < chars_offsets.size() ? offset + limit : chars_offsets.size();

PaddedPODArray<Offset> sizes = offsetToStrSize(chars_offsets, begin, end);
ostr.write(reinterpret_cast<const char *>(sizes.data()), sizeof(Offset) * sizes.size());

// [chars_begin, chars_end) is the range that need to be serialized of `chars`.
const auto chars_begin = begin == 0 ? 0 : chars_offsets[begin - 1];
const auto chars_end = chars_offsets[end - 1];
return {chars_begin, chars_end};
}

void serializeCharsBinary(const ColumnString::Chars_t & chars, WriteBuffer & ostr, size_t begin, size_t end)
{
ostr.write(reinterpret_cast<const char *>(&chars[begin]), end - begin);
}

size_t deserializeOffsetsBinary(ColumnString::Offsets & chars_offsets, ReadBuffer & istr, size_t limit)
{
PaddedPODArray<Offset> str_sizes(limit);
const auto size = istr.readBig(reinterpret_cast<char *>(str_sizes.data()), sizeof(Offset) * limit);
str_sizes.resize(size / sizeof(Offset));
strSizeToOffset(str_sizes, chars_offsets);
return std::accumulate(str_sizes.begin(), str_sizes.end(), 0uz);
}

void deserializeCharsBinary(ColumnString::Chars_t & chars, ReadBuffer & istr, size_t bytes)
{
const auto initial_size = chars.size();
chars.resize(initial_size + bytes);
istr.readStrict(reinterpret_cast<char *>(&chars[initial_size]), bytes);
}

void serializeBinaryBulkV2(
const IColumn & column,
WriteBuffer & offsets_stream,
WriteBuffer & chars_stream,
size_t offset,
size_t limit)
{
if (column.empty())
return;
const auto & column_string = typeid_cast<const ColumnString &>(column);
const auto & chars = column_string.getChars();
const auto & offsets = column_string.getOffsets();
auto [chars_begin, chars_end] = serializeOffsetsBinary(offsets, offsets_stream, offset, limit);
serializeCharsBinary(chars, chars_stream, chars_begin, chars_end);
}

void deserializeBinaryBulkV2(IColumn & column, ReadBuffer & offsets_stream, ReadBuffer & chars_stream, size_t limit)
{
if (limit == 0)
return;
auto & column_string = typeid_cast<ColumnString &>(column);
auto & chars = column_string.getChars();
auto & offsets = column_string.getOffsets();
auto bytes = deserializeOffsetsBinary(offsets, offsets_stream, limit);
deserializeCharsBinary(chars, chars_stream, bytes);
}

} // namespace

void DataTypeString::enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const
{
callback(path);
if (version == 1)
{
path.emplace_back(Substream::StringSizes);
callback(path);
}
}

void DataTypeString::serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (version == 1)
{
auto [offsets_stream, chars_stream] = getStream<WriteBuffer, IDataType::OutputStreamGetter>(getter, path);
serializeBinaryBulkV2(column, *offsets_stream, *chars_stream, offset, limit);
}
else
{
serializeBinaryBulk(column, *getter(path), offset, limit);
}
}

void DataTypeString::deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool /*position_independent_encoding*/,
SubstreamPath & path) const
{
if (version == 1)
{
auto [offsets_stream, chars_stream] = getStream<ReadBuffer, IDataType::InputStreamGetter>(getter, path);
deserializeBinaryBulkV2(column, *offsets_stream, *chars_stream, limit);
}
else
{
deserializeBinaryBulk(column, *getter(path), limit, avg_value_size_hint);
}
}


} // namespace DB
31 changes: 31 additions & 0 deletions dbms/src/DataTypes/DataTypeString.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,37 @@ class DataTypeString final : public IDataType
bool isString() const override { return true; }
bool isCategorial() const override { return true; }
bool canBeInsideNullable() const override { return true; }

void enumerateStreams(const StreamCallback & callback, SubstreamPath & path) const override;

void serializeBinaryBulkWithMultipleStreams(
const IColumn & column,
const OutputStreamGetter & getter,
size_t offset,
size_t limit,
bool position_independent_encoding,
SubstreamPath & path) const override;

void deserializeBinaryBulkWithMultipleStreams(
IColumn & column,
const InputStreamGetter & getter,
size_t limit,
double avg_value_size_hint,
bool position_independent_encoding,
SubstreamPath & path) const override;

DataTypeString(Int32 version_ = 1)
: version(version_)
{
assert(version == 0 || version == 1);
}
Int32 getVersion() const { return version; }

private:
// `version` == 0, use size-prefix format in serialization/deserialization.
// `version` == 1, seperate sizes and chars in serialization/deserialization.
// Default value is 1, 0 is use to read old data.
const Int32 version = 1;
};

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/DataTypes/IDataType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ bool IDataType::isArraySizes(const SubstreamPath & path)
return false;
}

bool IDataType::isStringSizes(const SubstreamPath & path)
{
return std::any_of(path.cbegin(), path.cend(), [](const auto & elem) {
return elem.type == IDataType::Substream::StringSizes;
});
}

String IDataType::getFileNameForStream(const String & column_name, const IDataType::SubstreamPath & path)
{
String nested_table_name = Nested::extractTableName(column_name);
Expand All @@ -127,6 +134,8 @@ String IDataType::getFileNameForStream(const String & column_name, const IDataTy
/// and name is encoded as a whole.
stream_name += "%2E" + escapeForFileName(elem.tuple_element_name);
}
else if (elem.type == Substream::StringSizes)
stream_name += ".size";
}
return stream_name;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class IDataType : private boost::noncopyable
NullMap,

TupleElement,

StringSizes,
};
Type type;

Expand Down Expand Up @@ -421,6 +423,7 @@ class IDataType : private boost::noncopyable

static bool isNullMap(const SubstreamPath & path);
static bool isArraySizes(const SubstreamPath & path);
static bool isStringSizes(const SubstreamPath & path);
};


Expand Down
Loading

0 comments on commit d925107

Please sign in to comment.