Skip to content

Commit

Permalink
Storage: let stable meta using protobuf format (#9054)
Browse files Browse the repository at this point in the history
ref #9036
  • Loading branch information
Lloyd-Pottiger authored May 17, 2024
1 parent ca030a3 commit d33545d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 52 deletions.
10 changes: 10 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/dtpb/dmfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,13 @@ message ColumnStat {
message ColumnStats {
repeated ColumnStat column_stats = 1;
}

message StableFile {
optional uint64 page_id = 1;
}

message StableLayerMeta {
optional uint64 valid_rows = 1;
optional uint64 valid_bytes = 2;
repeated StableFile files = 3;
}
122 changes: 83 additions & 39 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Storages/Page/V3/Universal/UniversalPageStorage.h>
#include <Storages/PathPool.h>


namespace DB
{
namespace ErrorCodes
Expand Down Expand Up @@ -82,31 +83,94 @@ void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb)
{
MemoryWriteBuffer buf(0, 8192);
// The method must call `buf.count()` to get the last seralized size before `buf.tryGetReadBuffer`
auto data_size = saveMeta(buf);
auto data_size = serializeMetaToBuf(buf);
meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size);
}

UInt64 StableValueSpace::saveMeta(WriteBuffer & buf) const
UInt64 StableValueSpace::serializeMetaToBuf(WriteBuffer & buf) const
{
writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf);
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (const auto & f : files)
writeIntBinary(f->pageId(), buf);

if (likely(STORAGE_FORMAT_CURRENT.stable == StableFormat::V1))
{
writeIntBinary(valid_rows, buf);
writeIntBinary(valid_bytes, buf);
writeIntBinary(static_cast<UInt64>(files.size()), buf);
for (const auto & f : files)
writeIntBinary(f->pageId(), buf);
}
else if (STORAGE_FORMAT_CURRENT.stable == StableFormat::V2)
{
dtpb::StableLayerMeta meta;
meta.set_valid_rows(valid_rows);
meta.set_valid_bytes(valid_bytes);
for (const auto & f : files)
meta.add_files()->set_page_id(f->pageId());

auto data = meta.SerializeAsString();
writeStringBinary(data, buf);
}
else
{
throw Exception("Unexpected version: {}", STORAGE_FORMAT_CURRENT.stable);
}
return buf.count();
}

namespace
{
dtpb::StableLayerMeta derializeMetaV1FromBuf(ReadBuffer & buf)
{
dtpb::StableLayerMeta meta;
UInt64 valid_rows, valid_bytes, size;
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
meta.set_valid_rows(valid_rows);
meta.set_valid_bytes(valid_bytes);
for (size_t i = 0; i < size; ++i)
{
UInt64 page_id;
readIntBinary(page_id, buf);
meta.add_files()->set_page_id(page_id);
}
return meta;
}

dtpb::StableLayerMeta derializeMetaV2FromBuf(ReadBuffer & buf)
{
dtpb::StableLayerMeta meta;
String data;
readStringBinary(data, buf);
RUNTIME_CHECK_MSG(
meta.ParseFromString(data),
"Failed to parse StableLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
return meta;
}

dtpb::StableLayerMeta derializeMetaFromBuf(ReadBuffer & buf)
{
UInt64 version;
readIntBinary(version, buf);
if (version == StableFormat::V1)
return derializeMetaV1FromBuf(buf);
else if (version == StableFormat::V2)
return derializeMetaV2FromBuf(buf);
else
throw Exception("Unexpected version: {}", version);
}
} // namespace

std::string StableValueSpace::serializeMeta() const
{
WriteBufferFromOwnString wb;
saveMeta(wb);
serializeMetaToBuf(wb);
return wb.releaseStr();
}

StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, PageIdU64 id)
{
// read meta page
Page page = dm_context.storage_pool->metaReader()->read(id); // not limit restore
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
return StableValueSpace::restore(dm_context, buf, id);
Expand All @@ -116,20 +180,11 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer
{
auto stable = std::make_shared<StableValueSpace>(id);

UInt64 version, valid_rows, valid_bytes, size;
readIntBinary(version, buf);
if (version != StableFormat::V1)
throw Exception("Unexpected version: " + DB::toString(version));

readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
UInt64 page_id;
auto metapb = derializeMetaFromBuf(buf);
auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
for (size_t i = 0; i < size; ++i)
for (int i = 0; i < metapb.files().size(); ++i)
{
readIntBinary(page_id, buf);

UInt64 page_id = metapb.files(i).page_id();
DMFilePtr dmfile;
auto path_delegate = dm_context.path_pool->getStableDiskDelegator();
if (remote_data_store)
Expand Down Expand Up @@ -170,8 +225,8 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer
stable->files.push_back(dmfile);
}

stable->valid_rows = valid_rows;
stable->valid_bytes = valid_bytes;
stable->valid_rows = metapb.valid_rows();
stable->valid_bytes = metapb.valid_bytes();

return stable;
}
Expand All @@ -192,22 +247,11 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( //
ReadBufferFromMemory buf(page.data.begin(), page.data.size());

// read stable meta info
UInt64 version, valid_rows, valid_bytes, size;
{
readIntBinary(version, buf);
if (version != StableFormat::V1)
throw Exception("Unexpected version: " + DB::toString(version));

readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);
readIntBinary(size, buf);
}

auto metapb = derializeMetaFromBuf(buf);
auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
for (size_t i = 0; i < size; ++i)
for (int i = 0; i < metapb.files().size(); ++i)
{
UInt64 page_id;
readIntBinary(page_id, buf);
UInt64 page_id = metapb.files(i).page_id();
auto full_page_id = UniversalPageIdFormat::toFullPageId(
UniversalPageIdFormat::toFullPrefix(
dm_context.keyspace_id,
Expand All @@ -234,8 +278,8 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( //
stable->files.push_back(dmfile);
}

stable->valid_rows = valid_rows;
stable->valid_bytes = valid_bytes;
stable->valid_rows = metapb.valid_rows();
stable->valid_bytes = metapb.valid_bytes();

return stable;
}
Expand Down
24 changes: 12 additions & 12 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ using StableValueSpacePtr = std::shared_ptr<StableValueSpace>;
class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
{
public:
StableValueSpace(PageIdU64 id_)
explicit StableValueSpace(PageIdU64 id_)
: id(id_)
, log(Logger::get())
{}
Expand Down Expand Up @@ -127,7 +127,7 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
// number of rows having at least one version(include delete)
UInt64 num_rows;

const String toDebugString() const
String toDebugString() const
{
return "StableProperty: gc_hint_version [" + std::to_string(this->gc_hint_version) + "] num_versions ["
+ std::to_string(this->num_versions) + "] num_puts[" + std::to_string(this->num_puts) + "] num_rows["
Expand All @@ -148,18 +148,18 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
{
StableValueSpacePtr stable;

PageIdU64 id;
UInt64 valid_rows;
UInt64 valid_bytes;
PageIdU64 id{};
UInt64 valid_rows{};
UInt64 valid_bytes{};

bool is_common_handle;
size_t rowkey_column_size;
bool is_common_handle{};
size_t rowkey_column_size{};

/// TODO: The members below are not actually snapshots, they should not be here.

ColumnCachePtrs column_caches;

Snapshot(StableValueSpacePtr stable_)
explicit Snapshot(StableValueSpacePtr stable_)
: stable(stable_)
, log(stable->log)
{}
Expand Down Expand Up @@ -263,19 +263,19 @@ class StableValueSpace : public std::enable_shared_from_this<StableValueSpace>
size_t avgRowBytes(const ColumnDefines & read_columns);

private:
UInt64 saveMeta(WriteBuffer & buf) const;
UInt64 serializeMetaToBuf(WriteBuffer & buf) const;

private:
const PageIdU64 id;

// Valid rows is not always the sum of rows in file,
// because after logical split, two segments could reference to a same file.
UInt64 valid_rows; /* At most. The actual valid rows may be lower than this value. */
UInt64 valid_bytes; /* At most. The actual valid bytes may be lower than this value. */
UInt64 valid_rows{}; /* At most. The actual valid rows may be lower than this value. */
UInt64 valid_bytes{}; /* At most. The actual valid bytes may be lower than this value. */

DMFiles files;

StableProperty property;
StableProperty property{};
std::atomic<bool> is_property_cached = false;

LoggerPtr log;
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,22 @@ try
}
CATCH

TEST_F(SegmentOperationTest, CurrentV2RestoreFromStableV1)
try
{
auto current = STORAGE_FORMAT_CURRENT;
STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V5;
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V6;
auto segment = Segment::restoreSegment(log, *dm_context, DELTA_MERGE_FIRST_SEGMENT_ID);
ASSERT_EQ(segment->stable->getRows(), 100);
STORAGE_FORMAT_CURRENT = current;
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentSplit)
try
{
Expand Down
26 changes: 25 additions & 1 deletion dbms/src/Storages/FormatVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace StableFormat
using Version = Int64;

inline static constexpr Version V1 = 1;
inline static constexpr Version V2 = 2; // Meta using protobuf
} // namespace StableFormat

namespace DeltaFormat
Expand Down Expand Up @@ -130,6 +131,15 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V5 = StorageFormatVersio
.identifier = 5,
};

inline static const StorageFormatVersion STORAGE_FORMAT_V6 = StorageFormatVersion{
.segment = SegmentFormat::V2,
.dm_file = DMFileFormat::V3,
.stable = StableFormat::V2, // diff
.delta = DeltaFormat::V3,
.page = PageFormat::V3,
.identifier = 6,
};

// STORAGE_FORMAT_V100 is used for S3 only
inline static const StorageFormatVersion STORAGE_FORMAT_V100 = StorageFormatVersion{
.segment = SegmentFormat::V2,
Expand All @@ -140,6 +150,16 @@ inline static const StorageFormatVersion STORAGE_FORMAT_V100 = StorageFormatVers
.identifier = 100,
};

// STORAGE_FORMAT_V101 is used for S3 only
inline static const StorageFormatVersion STORAGE_FORMAT_V101 = StorageFormatVersion{
.segment = SegmentFormat::V2,
.dm_file = DMFileFormat::V3,
.stable = StableFormat::V2, // diff
.delta = DeltaFormat::V3,
.page = PageFormat::V4,
.identifier = 101,
};

inline StorageFormatVersion STORAGE_FORMAT_CURRENT = STORAGE_FORMAT_V5;

inline const StorageFormatVersion & toStorageFormat(UInt64 setting)
Expand All @@ -156,10 +176,14 @@ inline const StorageFormatVersion & toStorageFormat(UInt64 setting)
return STORAGE_FORMAT_V4;
case 5:
return STORAGE_FORMAT_V5;
case 6:
return STORAGE_FORMAT_V6;
case 100:
return STORAGE_FORMAT_V100;
case 101:
return STORAGE_FORMAT_V101;
default:
throw Exception("Illegal setting value: " + DB::toString(setting));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal setting value: {}", setting);
}
}

Expand Down

0 comments on commit d33545d

Please sign in to comment.