Skip to content

Commit

Permalink
Storage: support restore segments from checkpoint with format version V7
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Aug 23, 2024
1 parent 2652edb commit 861c68f
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 76 deletions.
36 changes: 25 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
#include <Storages/PathPool.h>


namespace DB
namespace DB::DM
{
namespace DM
{
ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_, const RowKeyRange & segment_range_)

ColumnFileBig::ColumnFileBig(const DMContext & dm_context, const DMFilePtr & file_, const RowKeyRange & segment_range_)
: file(file_)
, segment_range(segment_range_)
{
calculateStat(context);
calculateStat(dm_context);
}

void ColumnFileBig::calculateStat(const DMContext & dm_context)
Expand Down Expand Up @@ -66,11 +65,11 @@ void ColumnFileBig::removeData(WriteBatches & wbs) const
}

ColumnFileReaderPtr ColumnFileBig::getReader(
const DMContext & context,
const DMContext & dm_context,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr & col_defs) const
{
return std::make_shared<ColumnFileBigReader>(context, *this, col_defs);
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs);
}

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
Expand Down Expand Up @@ -120,8 +119,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
}

ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
[[maybe_unused]] const LoggerPtr & parent_log,
DMContext & dm_context, //
DMContext & dm_context,
const RowKeyRange & target_range,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
Expand All @@ -140,6 +138,23 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
return std::shared_ptr<ColumnFileBig>(dp_file);
}

ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
DMContext & dm_context,
const RowKeyRange & target_range,
const dtpb::ColumnFileBig & cf_pb,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs)
{
UInt64 file_page_id = cf_pb.id();
size_t valid_rows = cf_pb.valid_rows();
size_t valid_bytes = cf_pb.valid_bytes();

auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, target_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}

void ColumnFileBigReader::initStream()
{
if (file_stream)
Expand Down Expand Up @@ -370,5 +385,4 @@ void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
30 changes: 19 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ class ColumnFileBig : public ColumnFilePersisted
, segment_range(segment_range_)
{}

void calculateStat(const DMContext & context);
void calculateStat(const DMContext & dm_context);

public:
ColumnFileBig(const DMContext & context, const DMFilePtr & file_, const RowKeyRange & segment_range_);
ColumnFileBig(const DMContext & dm_context, const DMFilePtr & file_, const RowKeyRange & segment_range_);

ColumnFileBig(const ColumnFileBig &) = default;

ColumnFileBigPtr cloneWith(DMContext & context, const DMFilePtr & new_file, const RowKeyRange & new_segment_range)
ColumnFileBigPtr cloneWith(
DMContext & dm_context,
const DMFilePtr & new_file,
const RowKeyRange & new_segment_range)
{
auto * new_column_file = new ColumnFileBig(*this);
new_column_file->file = new_file;
new_column_file->segment_range = new_segment_range;
// update `valid_rows` and `valid_bytes` by `new_segment_range`
new_column_file->calculateStat(context);
new_column_file->calculateStat(dm_context);
return std::shared_ptr<ColumnFileBig>(new_column_file);
}

Expand All @@ -77,29 +80,34 @@ class ColumnFileBig : public ColumnFilePersisted
void removeData(WriteBatches & wbs) const override;

ColumnFileReaderPtr getReader(
const DMContext & context,
const DMContext & dm_context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const override;

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;
void serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool save_schema) const override;

static ColumnFilePersistedPtr deserializeMetadata(
const DMContext & context,
const DMContext & dm_context,
const RowKeyRange & segment_range,
ReadBuffer & buf);
static ColumnFilePersistedPtr deserializeMetadata(
const DMContext & context,
const DMContext & dm_context,
const RowKeyRange & segment_range,
const dtpb::ColumnFileBig & cf_pb);

static ColumnFilePersistedPtr createFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context, //
DMContext & dm_context,
const RowKeyRange & target_range,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);
static ColumnFilePersistedPtr createFromCheckpoint(
DMContext & dm_context,
const RowKeyRange & target_range,
const dtpb::ColumnFileBig & cf_pb,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);

String toString() const override { return fmt::format("{{big_file,rows:{},bytes:{}}}", getRows(), getBytes()); }

Expand Down Expand Up @@ -155,10 +163,10 @@ class ColumnFileBigReader : public ColumnFileReader

public:
ColumnFileBigReader(
const DMContext & context_,
const DMContext & dm_context_,
const ColumnFileBig & column_file_,
const ColumnDefinesPtr & col_defs_)
: dm_context(context_)
: dm_context(dm_context_)
, column_file(column_file_)
, col_defs(col_defs_)
{
Expand Down
35 changes: 18 additions & 17 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & c
break;
}
default:
throw Exception(
"Unexpected delta value version: " + DB::toString(STORAGE_FORMAT_CURRENT.delta),
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected delta value version: {}", STORAGE_FORMAT_CURRENT.delta);
}
}

Expand All @@ -162,17 +160,14 @@ ColumnFilePersisteds deserializeSavedColumnFiles(
DeltaFormat::Version version;
readIntBinary(version, buf);

ColumnFilePersisteds column_files;
switch (version)
{
// V1 and V2 share the same deserializer.
case DeltaFormat::V1:
case DeltaFormat::V2:
column_files = deserializeSavedColumnFilesInV2Format(context, buf, version);
break;
return deserializeSavedColumnFilesInV2Format(context, buf, version);
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
break;
return deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
case DeltaFormat::V4:
{
dtpb::DeltaLayerMeta meta;
Expand All @@ -182,17 +177,15 @@ ColumnFilePersisteds deserializeSavedColumnFiles(
meta.ParseFromString(data),
"Failed to parse DeltaLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
column_files = deserializeSavedColumnFilesInV4Format(context, segment_range, meta);
break;
return deserializeSavedColumnFilesInV4Format(context, segment_range, meta);
}
default:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected delta value version: {}, latest version: {}",
version,
DeltaFormat::V3);
DeltaFormat::V4);
}
return column_files;
}

ColumnFilePersisteds createColumnFilesFromCheckpoint( //
Expand All @@ -207,20 +200,28 @@ ColumnFilePersisteds createColumnFilesFromCheckpoint( //
DeltaFormat::Version version;
readIntBinary(version, buf);

ColumnFilePersisteds column_files;
switch (version)
{
case DeltaFormat::V3:
column_files = createColumnFilesInV3FormatFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs);
break;
return createColumnFilesInV3FormatFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs);
case DeltaFormat::V4:
{
dtpb::DeltaLayerMeta meta;
String data;
readStringBinary(data, buf);
RUNTIME_CHECK_MSG(
meta.ParseFromString(data),
"Failed to parse DeltaLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
return createColumnFilesInV4FormatFromCheckpoint(parent_log, context, segment_range, meta, temp_ps, wbs);
}
default:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected delta value version: {}, latest version: {}",
version,
DeltaFormat::V3);
DeltaFormat::V4);
}
return column_files;
}

} // namespace DB::DM
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(
const RowKeyRange & segment_range,
ReadBuffer & buf);

ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint( //
ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context,
const RowKeyRange & segment_range,
Expand All @@ -96,5 +96,13 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV4Format(
const DMContext & context,
const RowKeyRange & segment_range,
const dtpb::DeltaLayerMeta & meta);

ColumnFilePersisteds createColumnFilesInV4FormatFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context,
const RowKeyRange & segment_range,
const dtpb::DeltaLayerMeta & meta,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);
} // namespace DM
} // namespace DB
71 changes: 54 additions & 17 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,16 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
WriteBatches & wbs,
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes)
{
auto schema = deserializeSchema(buf);
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
readIntBinary(rows, buf);
readIntBinary(bytes, buf);
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
Expand Down Expand Up @@ -273,9 +264,55 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));

auto column_file_schema = std::make_shared<ColumnFileSchema>(*schema);
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
{
auto schema = deserializeSchema(buf);
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
readIntBinary(rows, buf);
readIntBinary(bytes, buf);

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
schema,
};
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
const dtpb::ColumnFileTiny & cf_pb,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
{
auto schema = deserializeSchema(cf_pb.columns());
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();

return {
std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context),
std::move(schema),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
schema,
};
}

Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,29 @@ class ColumnFileTiny : public ColumnFilePersisted
const dtpb::ColumnFileTiny & cf_pb,
ColumnFileSchemaPtr & last_schema);

static ColumnFilePersistedPtr restoreFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs,
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
const dtpb::ColumnFileTiny & cf_pb,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs);

bool mayBeFlushedFrom(ColumnFile * from_file) const override
{
Expand Down
Loading

0 comments on commit 861c68f

Please sign in to comment.