Skip to content

Commit

Permalink
gc be binlog metas when tablet is dropped.
Browse files Browse the repository at this point in the history
  • Loading branch information
deadlinefen committed Aug 3, 2023
1 parent 3447a70 commit 9f74166
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 38 deletions.
5 changes: 3 additions & 2 deletions be/src/olap/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
#include "olap/olap_common.h"

namespace doris {
constexpr std::string_view kBinlogPrefix = "binglog_";
constexpr std::string_view kBinlogPrefix = "binlog_";
constexpr std::string_view kBinlogMetaPrefix = "binlog_meta_";
constexpr std::string_view kBinlogDataPrefix = "binlog_data_";

inline auto make_binlog_meta_key(std::string_view tablet, int64_t version,
std::string_view rowset) {
Expand Down Expand Up @@ -82,7 +83,7 @@ inline bool starts_with_binlog_meta(std::string_view str) {
}

inline std::string get_binlog_data_key_from_meta_key(std::string_view meta_key) {
// like "binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7" => "binglog_data-6943f1585fe834b5-e542c2b83a21d0b7"
// like "binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7" => "binlog_data-6943f1585fe834b5-e542c2b83a21d0b7"
return fmt::format("{}data_{}", kBinlogPrefix, meta_key.substr(kBinlogMetaPrefix.length()));
}
} // namespace doris
10 changes: 8 additions & 2 deletions be/src/olap/olap_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,18 @@ Status OlapMeta::remove(const int column_family_index, const std::vector<std::st

Status OlapMeta::iterate(const int column_family_index, const std::string& prefix,
std::function<bool(const std::string&, const std::string&)> const& func) {
return iterate(column_family_index, prefix, prefix, func);
}

Status OlapMeta::iterate(const int column_family_index, const std::string& seek_key,
const std::string& prefix,
std::function<bool(const std::string&, const std::string&)> const& func) {
auto& handle = _handles[column_family_index];
std::unique_ptr<Iterator> it(_db->NewIterator(ReadOptions(), handle.get()));
if (prefix == "") {
if (seek_key == "") {
it->SeekToFirst();
} else {
it->Seek(prefix);
it->Seek(seek_key);
}
rocksdb::Status status = it->status();
if (!status.ok()) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/olap_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class OlapMeta final {

Status iterate(const int column_family_index, const std::string& prefix,
std::function<bool(const std::string&, const std::string&)> const& func);
Status iterate(const int column_family_index, const std::string& seek_key,
const std::string& prefix,
std::function<bool(const std::string&, const std::string&)> const& func);

std::string get_root_path() const { return _root_path; }

Expand Down
46 changes: 44 additions & 2 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta,
auto traverse_func = [&rowset_id, &num_segments](const std::string& key,
const std::string& value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
// key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
// key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
// check starts with "binlog_meta_"
if (!starts_with_binlog_meta(key)) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
Expand Down Expand Up @@ -229,7 +229,7 @@ std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info(
auto traverse_func = [&rowset_id, &num_segments](const std::string& key,
const std::string& value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
// key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
// key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
auto pos = key.rfind('_');
if (pos == std::string::npos) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
Expand Down Expand Up @@ -283,6 +283,11 @@ Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const Row
return status;
}

Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) {
return meta->remove(META_COLUMN_FAMILY_INDEX,
{kBinlogMetaPrefix.data() + suffix, kBinlogDataPrefix.data() + suffix});
}

Status RowsetMetaManager::traverse_rowset_metas(
OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&, const std::string&)> const& func) {
Expand All @@ -307,6 +312,43 @@ Status RowsetMetaManager::traverse_rowset_metas(
return status;
}

Status RowsetMetaManager::traverse_binlog_metas(
OlapMeta* meta, std::function<bool(const string&, const string&, bool)> const& collector) {
std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false);
bool seek_found = false;
Status status;
auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector](
const std::string& key,
const std::string& value) -> bool {
seek_found = true;
auto& [last_prefix, need_collect] = last_info;
size_t pos = key.find('_', kBinlogMetaPrefix.size());
if (pos == std::string::npos) {
LOG(WARNING) << "invalid binlog meta key: " << key;
return true;
}
std::string_view key_view(key.data(), pos);
std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1);

if (last_prefix_view != key_view) {
need_collect = collector(key, value, true);
last_prefix = std::string(key_view) + "~";
} else if (need_collect) {
collector(key, value, false);
}

return need_collect;
};

do {
seek_found = false;
status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(),
traverse_binlog_meta_func);
} while (status.ok() && seek_found);

return status;
}

Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* meta,
const std::string& rowset_meta_path) {
std::ifstream infile(rowset_meta_path);
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,14 @@ class RowsetMetaManager {

static Status remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id);

static Status traverse_rowset_metas(
OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&, const std::string&)> const& func);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);

static Status traverse_rowset_metas(OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&,
const std::string&)> const& collector);

static Status traverse_binlog_metas(
OlapMeta* meta, std::function<bool(const string&, const string&, bool)> const& func);

static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);

Expand Down
36 changes: 36 additions & 0 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool ignore_guard) {
// clean unused rowset metas in OlapMeta
_clean_unused_rowset_metas();

// clean unused binlog metas in OlapMeta
_clean_unused_binlog_metas();

// cleand unused delete bitmap for deleted tablet
_clean_unused_delete_bitmap();

Expand Down Expand Up @@ -774,6 +777,39 @@ void StorageEngine::_clean_unused_rowset_metas() {
}
}

void StorageEngine::_clean_unused_binlog_metas() {
std::vector<std::string> unused_binlog_key_suffixes;
auto unused_binlog_collector = [this, &unused_binlog_key_suffixes](const std::string& key,
const std::string& value,
bool need_check) -> bool {
if (need_check) {
BinlogMetaEntryPB binlog_meta_pb;
if (UNLIKELY(!binlog_meta_pb.ParseFromString(value))) {
LOG(WARNING) << "parse rowset meta string failed for binlog meta key: " << key;
} else if (_tablet_manager->get_tablet(binlog_meta_pb.tablet_id()) == nullptr) {
LOG(INFO) << "failed to find tablet " << binlog_meta_pb.tablet_id()
<< " for binlog rowset: " << binlog_meta_pb.rowset_id()
<< ", tablet may be dropped";
} else {
return false;
}
}

unused_binlog_key_suffixes.emplace_back(key.substr(kBinlogMetaPrefix.size()));
return true;
};
auto data_dirs = get_stores();
for (auto data_dir : data_dirs) {
RowsetMetaManager::traverse_binlog_metas(data_dir->get_meta(), unused_binlog_collector);
for (const auto& suffix : unused_binlog_key_suffixes) {
RowsetMetaManager::remove_binlog(data_dir->get_meta(), suffix);
}
LOG(INFO) << "remove " << unused_binlog_key_suffixes.size()
<< " invalid binlog meta from dir: " << data_dir->path();
unused_binlog_key_suffixes.clear();
}
}

void StorageEngine::_clean_unused_delete_bitmap() {
std::unordered_set<int64_t> removed_tablets;
auto clean_delete_bitmap_func = [this, &removed_tablets](int64_t tablet_id, int64_t version,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ class StorageEngine {

void _clean_unused_rowset_metas();

void _clean_unused_binlog_metas();

void _clean_unused_delete_bitmap();

void _clean_unused_pending_publish_info();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3569,7 +3569,7 @@ void Tablet::gc_binlogs(int64_t version) {
if (binlog_meta_entry_pb.has_rowset_id_v2()) {
rowset_id = binlog_meta_entry_pb.rowset_id_v2();
} else {
// key is 'binglog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
// key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
auto pos = key.rfind('_');
if (pos == std::string::npos) {
LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.gson.annotations.SerializedName;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class BinlogTombstone {
Expand All @@ -36,12 +35,6 @@ public class BinlogTombstone {
@SerializedName(value = "commitSeq")
private long commitSeq;

// TODO(deadlinefen): delete this field later
// This is a reserved field for the transition between new and old versions.
// It will be deleted later
@SerializedName(value = "tableIds")
private List<Long> tableIds;

@SerializedName(value = "tableCommitSeqMap")
private Map<Long, Long> tableCommitSeqMap;

Expand All @@ -54,15 +47,13 @@ public BinlogTombstone(long dbId, boolean isDbTombstone) {
this.dbBinlogTombstone = isDbTombstone;
this.dbId = dbId;
this.commitSeq = -1;
this.tableIds = Collections.emptyList();
this.tableCommitSeqMap = Maps.newHashMap();
}

public BinlogTombstone(long tableId, long commitSeq) {
this.dbBinlogTombstone = false;
this.dbId = -1;
this.commitSeq = commitSeq;
this.tableIds = Collections.emptyList();
this.tableCommitSeqMap = Collections.singletonMap(tableId, commitSeq);
}

Expand Down Expand Up @@ -92,14 +83,6 @@ public long getDbId() {
return dbId;
}

// TODO(deadlinefen): deprecated this code later
public List<Long> getTableIds() {
if (tableIds == null) {
tableIds = Collections.emptyList();
}
return tableIds;
}

public Map<Long, Long> getTableCommitSeqMap() {
if (tableCommitSeqMap == null) {
tableCommitSeqMap = Maps.newHashMap();
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,17 +424,6 @@ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
}

Map<Long, Long> tableCommitSeqMap = tombstone.getTableCommitSeqMap();
// TODO(deadlinefen): delete this code
// This is a reserved code for the transition between new and old versions.
// It will be deleted later
if (tableCommitSeqMap.isEmpty()) {
long commitSeq = tombstone.getCommitSeq();
List<Long> tableIds = tombstone.getTableIds();
for (long tableId : tableIds) {
tableCommitSeqMap.put(tableId, commitSeq);
}
}

for (TableBinlog tableBinlog : tableBinlogs) {
long tableId = tableBinlog.getTableId();
if (tableCommitSeqMap.containsKey(tableId)) {
Expand Down

0 comments on commit 9f74166

Please sign in to comment.