Skip to content

Commit

Permalink
remove snapshotManager
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiaoshuai123 committed Apr 20, 2024
1 parent 2e8d4d6 commit 330760f
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 175 deletions.
4 changes: 2 additions & 2 deletions save_load.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ redis-cli -p 7777 raft.cluster init
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t set


redis-cli -p 7777 raft.node DSS
redis-cli -p 7777 raft.node DSS
redis-cli -p 7777 raft.node DOSNAPSHOT
redis-cli -p 7777 raft.node DOSNAPSHOT

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777

Expand Down
36 changes: 0 additions & 36 deletions src/checkpoint_manager.cc

This file was deleted.

37 changes: 0 additions & 37 deletions src/checkpoint_manager.h

This file was deleted.

91 changes: 78 additions & 13 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

#include "db.h"

#include "checkpoint_manager.h"
#include "config.h"
#include "praft/praft.h"
#include "pstd/log.h"
Expand All @@ -16,11 +15,11 @@ extern pikiwidb::PConfig g_config;

namespace pikiwidb {

DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {
DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) {
storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = g_config.db_instance_num;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -37,31 +36,97 @@ DB::DB(int db_index, const std::string& db_path)
ERROR("Storage open failed! {}", s.ToString());
abort();
}

checkpoint_manager_ = std::make_unique<CheckpointManager>();
checkpoint_manager_->Init(g_config.db_instance_num, this);

opened_ = true;
INFO("Open DB{} success!", db_index_);
}

void DB::DoBgSave(const std::string& path, int i) {
void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold storage's sharedLock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the storage's checkpoint
// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::CreateCheckpoint(const std::string& path) {
void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the mutual exclusion lock

// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
}

void DB::CreateCheckpoint(const std::string& path, bool sync) {
auto tmp_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(tmp_path)) {
WARN("Create dir {} fail !", tmp_path);
return;
}
checkpoint_manager_->CreateCheckpoint(path);

std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i);
result.push_back(std::move(res));
}
if (sync) {
for (auto& r : result) {
r.get();
}
}
}

void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); }
void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) {
opened_.store(false);
// 对于每一个 rocksdb 分别去 Load 自己的 DB.
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
return;
}
if (0 != pstd::IsDir(db_path_)) {
if (0 != pstd::CreateDir(db_path_)) {
WARN("Create dir {} fail !", db_path_);
return;
}
}

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
for (auto& r : result) {
r.get();
}
// 重新启动
storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
storage_options.options.ttl = g_config.rocksdb_ttl_second;
storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second;
if (g_config.use_raft) {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
abort();
}
opened_.store(true);
INFO("DB{} load a checkpoint from {} success!", db_index_, path);
}
} // namespace pikiwidb
23 changes: 10 additions & 13 deletions src/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

#pragma once

#include <filesystem>
#include "storage/storage.h"

namespace pikiwidb {

class CheckpointManager;

class DB {
public:
DB(int db_index, const std::string& db_path);
DB(int db_index, const std::string& db_path, int rocksdb_inst_num);

std::unique_ptr<storage::Storage>& GetStorage() { return storage_; }

Expand All @@ -27,20 +26,20 @@ class DB {

void UnLockShared() { storage_mutex_.unlock_shared(); }

void CreateCheckpoint(const std::string& path);

[[maybe_unused]] void DoBgSave(const std::string&, int i);
void CreateCheckpoint(const std::string& path, bool sync);

void WaitForCheckpointDone();
void LoadDBFromCheckPoint(const std::string& path, bool sync = false);

int GetDbIndex() { return db_index_; }

private:
void DoCheckpoint(const std::string&, int i);
void LoadCheckpoint(const std::string&, const std::string& db_path, int i);

private:
const int db_index_ = 0;
const std::string db_path_;
const std::string dump_parent_path_;
const std::string dump_path_;

int rocksdb_inst_num_ = 0;
/**
* If you want to change the pointer that points to storage,
* you must first acquire a mutex lock.
Expand All @@ -49,9 +48,7 @@ class DB {
*/
std::shared_mutex storage_mutex_;
std::unique_ptr<storage::Storage> storage_;
bool opened_ = false;

std::unique_ptr<CheckpointManager> checkpoint_manager_;
std::atomic_bool opened_ = false;
};

} // namespace pikiwidb
61 changes: 16 additions & 45 deletions src/praft/praft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) {
server_.reset();
return ERROR_LOG_AND_STATUS("Failed to start server");
}

// It's ok to start PRaft;
assert(group_id.size() == RAFT_GROUPID_LEN);
this->group_id_ = group_id;
Expand Down Expand Up @@ -580,7 +579,7 @@ void PRaft::AppendLog(const Binlog& log, std::promise<rocksdb::Status>&& promise
node_->apply(task);
}

void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) {
int PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) {
assert(writer);
for (const auto& entry : std::filesystem::directory_iterator(dir)) {
if (entry.is_directory()) {
Expand All @@ -591,34 +590,12 @@ void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter*
} else {
DEBUG("file_path = {}", std::filesystem::relative(entry.path(), path).string());
if (writer->add_file(std::filesystem::relative(entry.path(), path)) != 0) {
ERROR("add file error!");
ERROR("add file {} to snapshot fail!", entry.path().string());
return -1;
}
}
}
}

void PRaft::RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination) {
if (std::filesystem::is_regular_file(source)) {
if (source.filename() == PBRAFT_SNAPSHOT_META_FILE) {
return;
} else if (source.extension() == ".sst") {
// Create a hard link
DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string());
::link(source.c_str(), destination.c_str());
} else {
// Copy the file
DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string());
std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing);
}
} else {
if (!pstd::FileExists(destination)) {
pstd::CreateDir(destination);
}

for (const auto& entry : std::filesystem::directory_iterator(source)) {
RecursiveCopy(entry.path(), destination / entry.path().filename());
}
}
return 0;
}

// @braft::StateMachine
Expand Down Expand Up @@ -663,30 +640,24 @@ void PRaft::on_apply(braft::Iterator& iter) {
}

void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {
assert(writer);
brpc::ClosureGuard done_guard(done);
TasksVector tasks;
tasks.reserve(g_config.databases);
for (auto i = 0; i < g_config.databases; ++i) {
tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}});
}
auto path = writer->get_path();
INFO("Saving snapshot to {}", path);
TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true});
PSTORE.DoSomeThingSpecificDB(tasks);
PSTORE.WaitForCheckpointDone();
auto writer_path = writer->get_path();
AddAllFiles(writer_path, writer, writer_path);
if (auto res = AddAllFiles(path, writer, path); res != 0) {
done->status().set_error(EIO, "Fail to add file to writer");
}
}

int PRaft::on_snapshot_load(braft::SnapshotReader* reader) {
CHECK(!IsLeader()) << "Leader is not supposed to load snapshot";
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto db_path = g_config.dbpath;
PSTORE.Clear();
for (int i = 0; i < g_config.databases; i++) {
auto sub_path = db_path + std::to_string(i);
pstd::DeleteDirIfExist(sub_path);
}
db_path.pop_back();
RecursiveCopy(reader_path, db_path);
PSTORE.Init();
assert(reader);
auto reader_path = reader->get_path(); // xx/snapshot_0000001
auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id
TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true});
PSTORE.DoSomeThingSpecificDB(tasks);
return 0;
}

Expand Down
6 changes: 2 additions & 4 deletions src/praft/praft.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace pikiwidb {

#define PBRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta"
#define RAFT_GROUPID_LEN 32

#define OK "+OK"
Expand Down Expand Up @@ -156,9 +155,7 @@ class PRaft : public braft::StateMachine {
void on_start_following(const ::braft::LeaderChangeContext& ctx) override;

private:
void AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path);

void RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination);
static int AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path);

private:
std::unique_ptr<brpc::Server> server_{nullptr}; // brpc
Expand All @@ -168,6 +165,7 @@ class PRaft : public braft::StateMachine {

ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command
std::string group_id_; // group id
int db_id_ = 0; // db_id
};

} // namespace pikiwidb
Loading

0 comments on commit 330760f

Please sign in to comment.