Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: support restore segments from checkpoint with format version V7 #9343

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@
#include <Storages/PathPool.h>


namespace DB
namespace DB::DM
{
namespace DM
{
ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_, const RowKeyRange & segment_range_)

ColumnFileBig::ColumnFileBig(const DMContext & dm_context, const DMFilePtr & file_, const RowKeyRange & segment_range_)
: file(file_)
, segment_range(segment_range_)
{
calculateStat(context);
calculateStat(dm_context);
}

void ColumnFileBig::calculateStat(const DMContext & dm_context)
Expand Down Expand Up @@ -66,11 +65,11 @@ void ColumnFileBig::removeData(WriteBatches & wbs) const
}

ColumnFileReaderPtr ColumnFileBig::getReader(
const DMContext & context,
const DMContext & dm_context,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr & col_defs) const
{
return std::make_shared<ColumnFileBigReader>(context, *this, col_defs);
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs);
}

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
Expand Down Expand Up @@ -120,8 +119,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
}

ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
[[maybe_unused]] const LoggerPtr & parent_log,
DMContext & dm_context, //
DMContext & dm_context,
const RowKeyRange & target_range,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
Expand All @@ -140,6 +138,23 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
return std::shared_ptr<ColumnFileBig>(dp_file);
}

ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
DMContext & dm_context,
const RowKeyRange & target_range,
const dtpb::ColumnFileBig & cf_pb,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs)
{
UInt64 file_page_id = cf_pb.id();
size_t valid_rows = cf_pb.valid_rows();
size_t valid_bytes = cf_pb.valid_bytes();

auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, target_range);
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
return std::shared_ptr<ColumnFileBig>(dp_file);
}

void ColumnFileBigReader::initStream()
{
if (file_stream)
Expand Down Expand Up @@ -370,5 +385,4 @@ void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
read_tag = read_tag_;
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
30 changes: 19 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ class ColumnFileBig : public ColumnFilePersisted
, segment_range(segment_range_)
{}

void calculateStat(const DMContext & context);
void calculateStat(const DMContext & dm_context);

public:
ColumnFileBig(const DMContext & context, const DMFilePtr & file_, const RowKeyRange & segment_range_);
ColumnFileBig(const DMContext & dm_context, const DMFilePtr & file_, const RowKeyRange & segment_range_);

ColumnFileBig(const ColumnFileBig &) = default;

ColumnFileBigPtr cloneWith(DMContext & context, const DMFilePtr & new_file, const RowKeyRange & new_segment_range)
ColumnFileBigPtr cloneWith(
DMContext & dm_context,
const DMFilePtr & new_file,
const RowKeyRange & new_segment_range)
{
auto * new_column_file = new ColumnFileBig(*this);
new_column_file->file = new_file;
new_column_file->segment_range = new_segment_range;
// update `valid_rows` and `valid_bytes` by `new_segment_range`
new_column_file->calculateStat(context);
new_column_file->calculateStat(dm_context);
return std::shared_ptr<ColumnFileBig>(new_column_file);
}

Expand All @@ -77,29 +80,34 @@ class ColumnFileBig : public ColumnFilePersisted
void removeData(WriteBatches & wbs) const override;

ColumnFileReaderPtr getReader(
const DMContext & context,
const DMContext & dm_context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const override;

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;
void serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool save_schema) const override;

static ColumnFilePersistedPtr deserializeMetadata(
const DMContext & context,
const DMContext & dm_context,
const RowKeyRange & segment_range,
ReadBuffer & buf);
static ColumnFilePersistedPtr deserializeMetadata(
const DMContext & context,
const DMContext & dm_context,
const RowKeyRange & segment_range,
const dtpb::ColumnFileBig & cf_pb);

static ColumnFilePersistedPtr createFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context, //
DMContext & dm_context,
const RowKeyRange & target_range,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);
static ColumnFilePersistedPtr createFromCheckpoint(
DMContext & dm_context,
const RowKeyRange & target_range,
const dtpb::ColumnFileBig & cf_pb,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);

String toString() const override { return fmt::format("{{big_file,rows:{},bytes:{}}}", getRows(), getBytes()); }

Expand Down Expand Up @@ -155,10 +163,10 @@ class ColumnFileBigReader : public ColumnFileReader

public:
ColumnFileBigReader(
const DMContext & context_,
const DMContext & dm_context_,
const ColumnFileBig & column_file_,
const ColumnDefinesPtr & col_defs_)
: dm_context(context_)
: dm_context(dm_context_)
, column_file(column_file_)
, col_defs(col_defs_)
{
Expand Down
35 changes: 18 additions & 17 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,7 @@ void serializeSavedColumnFiles(WriteBuffer & buf, const ColumnFilePersisteds & c
break;
}
default:
throw Exception(
"Unexpected delta value version: " + DB::toString(STORAGE_FORMAT_CURRENT.delta),
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected delta value version: {}", STORAGE_FORMAT_CURRENT.delta);
}
}

Expand All @@ -162,17 +160,14 @@ ColumnFilePersisteds deserializeSavedColumnFiles(
DeltaFormat::Version version;
readIntBinary(version, buf);

ColumnFilePersisteds column_files;
switch (version)
{
// V1 and V2 share the same deserializer.
case DeltaFormat::V1:
case DeltaFormat::V2:
column_files = deserializeSavedColumnFilesInV2Format(context, buf, version);
break;
return deserializeSavedColumnFilesInV2Format(context, buf, version);
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
break;
return deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
case DeltaFormat::V4:
{
dtpb::DeltaLayerMeta meta;
Expand All @@ -182,17 +177,15 @@ ColumnFilePersisteds deserializeSavedColumnFiles(
meta.ParseFromString(data),
"Failed to parse DeltaLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
column_files = deserializeSavedColumnFilesInV4Format(context, segment_range, meta);
break;
return deserializeSavedColumnFilesInV4Format(context, segment_range, meta);
}
default:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected delta value version: {}, latest version: {}",
version,
DeltaFormat::V3);
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
DeltaFormat::V4);
}
return column_files;
}

ColumnFilePersisteds createColumnFilesFromCheckpoint( //
Expand All @@ -207,20 +200,28 @@ ColumnFilePersisteds createColumnFilesFromCheckpoint( //
DeltaFormat::Version version;
readIntBinary(version, buf);

ColumnFilePersisteds column_files;
switch (version)
{
case DeltaFormat::V3:
column_files = createColumnFilesInV3FormatFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs);
break;
return createColumnFilesInV3FormatFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs);
case DeltaFormat::V4:
{
dtpb::DeltaLayerMeta meta;
String data;
readStringBinary(data, buf);
RUNTIME_CHECK_MSG(
meta.ParseFromString(data),
"Failed to parse DeltaLayerMeta from string: {}",
Redact::keyToHexString(data.data(), data.size()));
return createColumnFilesInV4FormatFromCheckpoint(parent_log, context, segment_range, meta, temp_ps, wbs);
}
default:
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Unexpected delta value version: {}, latest version: {}",
version,
DeltaFormat::V3);
DeltaFormat::V4);
}
return column_files;
}

} // namespace DB::DM
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(
const RowKeyRange & segment_range,
ReadBuffer & buf);

ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint( //
ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context,
const RowKeyRange & segment_range,
Expand All @@ -96,5 +96,13 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV4Format(
const DMContext & context,
const RowKeyRange & segment_range,
const dtpb::DeltaLayerMeta & meta);

ColumnFilePersisteds createColumnFilesInV4FormatFromCheckpoint(
const LoggerPtr & parent_log,
DMContext & context,
const RowKeyRange & segment_range,
const dtpb::DeltaLayerMeta & meta,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs);
} // namespace DM
} // namespace DB
71 changes: 54 additions & 17 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,16 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata(
return std::make_shared<ColumnFileTiny>(schema, rows, bytes, data_page_id, context);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
ColumnFilePersistedPtr ColumnFileTiny::restoreFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
WriteBatches & wbs,
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes)
{
auto schema = deserializeSchema(buf);
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
readIntBinary(rows, buf);
readIntBinary(bytes, buf);
auto new_cf_id = context.storage_pool->newLogPageId();
/// Generate a new RemotePage with an entry with data location on S3
auto remote_page_id = UniversalPageIdFormat::toFullPageId(
Expand Down Expand Up @@ -273,9 +264,55 @@ std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoin
wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets));

auto column_file_schema = std::make_shared<ColumnFileSchema>(*schema);
return std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context);
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
{
auto schema = deserializeSchema(buf);
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id;
size_t rows, bytes;

readIntBinary(data_page_id, buf);
readIntBinary(rows, buf);
readIntBinary(bytes, buf);

return {
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
schema,
};
}

std::tuple<ColumnFilePersistedPtr, BlockPtr> ColumnFileTiny::createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
const dtpb::ColumnFileTiny & cf_pb,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs)
{
auto schema = deserializeSchema(cf_pb.columns());
if (!schema)
schema = last_schema;
RUNTIME_CHECK(schema != nullptr);

PageIdU64 data_page_id = cf_pb.id();
size_t rows = cf_pb.rows();
size_t bytes = cf_pb.bytes();

return {
std::make_shared<ColumnFileTiny>(column_file_schema, rows, bytes, new_cf_id, context),
std::move(schema),
restoreFromCheckpoint(parent_log, context, temp_ps, wbs, schema, data_page_id, rows, bytes),
schema,
};
}

Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,29 @@ class ColumnFileTiny : public ColumnFilePersisted
const dtpb::ColumnFileTiny & cf_pb,
ColumnFileSchemaPtr & last_schema);

static ColumnFilePersistedPtr restoreFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
UniversalPageStoragePtr temp_ps,
WriteBatches & wbs,
BlockPtr schema,
PageIdU64 data_page_id,
size_t rows,
size_t bytes);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
ReadBuffer & buf,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs);
static std::tuple<ColumnFilePersistedPtr, BlockPtr> createFromCheckpoint(
const LoggerPtr & parent_log,
const DMContext & context,
const dtpb::ColumnFileTiny & cf_pb,
UniversalPageStoragePtr temp_ps,
const BlockPtr & last_schema,
WriteBatches & wbs);

bool mayBeFlushedFrom(ColumnFile * from_file) const override
{
Expand Down
Loading