Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snapshot save & load #238

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
killall -9 pikiwidb
mkdir leader follower1 follower2

cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 &

cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 &
sleep 10
redis-cli -p 7777 raft.cluster init
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t hset


redis-cli -p 7777 raft.node DOSNAPSHOT
redis-cli -p 7777 raft.node DOSNAPSHOT

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777



45 changes: 0 additions & 45 deletions src/checkpoint_manager.cc

This file was deleted.

37 changes: 0 additions & 37 deletions src/checkpoint_manager.h

This file was deleted.

3 changes: 1 addition & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ enum ClientFlag {
kClientFlagMaster = (1 << 3),
};

class DB;
struct PSlaveInfo;

class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
Expand Down Expand Up @@ -251,4 +250,4 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

static thread_local PClient* s_current;
};
} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 1 addition & 1 deletion src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
*/

#include "cmd_admin.h"

#include "braft/raft.h"
#include "rocksdb/version.h"

#include "praft/praft.h"
#include "store.h"

namespace pikiwidb {

Expand Down
17 changes: 14 additions & 3 deletions src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <optional>
#include <string>

#include "net/event_loop.h"
#include "praft/praft.h"
#include "pstd/log.h"
#include "pstd/pstd_string.h"
Expand All @@ -28,8 +29,9 @@ RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity)
bool RaftNodeCmd::DoInitial(PClient* client) {
auto cmd = client->argv_[1];
pstd::StringToUpper(cmd);
if (cmd != kAddCmd && cmd != kRemoveCmd) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only");

if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only");
return false;
}
return true;
Expand All @@ -40,8 +42,10 @@ void RaftNodeCmd::DoCmd(PClient* client) {
pstd::StringToUpper(cmd);
if (cmd == kAddCmd) {
DoCmdAdd(client);
} else {
} else if (cmd == kRemoveCmd) {
DoCmdRemove(client);
} else {
DoCmdSnapshot(client);
}
}

Expand Down Expand Up @@ -115,6 +119,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
}

RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {}

Expand Down
4 changes: 3 additions & 1 deletion src/cmd_raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ class RaftNodeCmd : public BaseCmd {
void DoCmd(PClient *client) override;
void DoCmdAdd(PClient *client);
void DoCmdRemove(PClient *client);
void DoCmdSnapshot(PClient *client);

static constexpr std::string_view kAddCmd = "ADD";
static constexpr std::string_view kRemoveCmd = "REMOVE";
static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT";
};

/* RAFT.CLUSTER INIT <id>
Expand Down Expand Up @@ -78,4 +80,4 @@ class RaftClusterCmd : public BaseCmd {
static constexpr std::string_view kJoinCmd = "JOIN";
};

} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "cmd_table_manager.h"

#include <memory>

#include "cmd_admin.h"
#include "cmd_hash.h"
#include "cmd_keys.h"
Expand Down
100 changes: 85 additions & 15 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@

#include "db.h"

#include "config.h"
#include "praft/praft.h"
#include "pstd/log.h"

#include "checkpoint_manager.h"
#include "config.h"

extern pikiwidb::PConfig g_config;

namespace pikiwidb {

DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {
DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) {
storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = g_config.db_instance_num;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -38,24 +36,96 @@ DB::DB(int db_index, const std::string& db_path)
ERROR("Storage open failed! {}", s.ToString());
abort();
}

checkpoint_manager_ = std::make_unique<CheckpointManager>();
checkpoint_manager_->Init(g_config.db_instance_num, this);

opened_ = true;
opened_.store(true);
INFO("Open DB{} success!", db_index_);
}

void DB::DoBgSave(const std::string& path, int i) {
void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold storage's sharedLock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the storage's checkpoint
// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); }
void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the mutual exclusion lock

void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); }
// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
}

void DB::CreateCheckpoint(const std::string& path, bool sync) {
auto tmp_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(tmp_path)) {
WARN("Create dir {} fail !", tmp_path);
return;
}

std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i);
result.push_back(std::move(res));
}
if (sync) {
for (auto& r : result) {
r.get();
}
}
}

void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) {
dingxiaoshuai123 marked this conversation as resolved.
Show resolved Hide resolved
opened_.store(false);
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
return;
}
if (0 != pstd::IsDir(db_path_)) {
if (0 != pstd::CreateDir(db_path_)) {
WARN("Create dir {} fail !", db_path_);
return;
}
}

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
for (auto& r : result) {
r.get();
}

storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
storage_options.options.ttl = g_config.rocksdb_ttl_second;
storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second;
if (g_config.use_raft) {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
abort();
}
opened_.store(true);
INFO("DB{} load a checkpoint from {} success!", db_index_, path);
}
} // namespace pikiwidb
23 changes: 10 additions & 13 deletions src/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@

#pragma once

#include <filesystem>
#include "storage/storage.h"

namespace pikiwidb {

class CheckpointManager;

class DB {
public:
DB(int db_index, const std::string& db_path);
DB(int db_index, const std::string& db_path, int rocksdb_inst_num);

std::unique_ptr<storage::Storage>& GetStorage() { return storage_; }

Expand All @@ -27,20 +26,20 @@ class DB {

void UnLockShared() { storage_mutex_.unlock_shared(); }

void CreateCheckpoint(const std::string& path);

[[maybe_unused]] void DoBgSave(const std::string&, int i);
void CreateCheckpoint(const std::string& path, bool sync);

void WaitForCheckpointDone();
void LoadDBFromCheckPoint(const std::string& path, bool sync = false);

int GetDbIndex() { return db_index_; }

private:
void DoCheckpoint(const std::string&, int i);
void LoadCheckpoint(const std::string&, const std::string& db_path, int i);

private:
const int db_index_ = 0;
const std::string db_path_;
const std::string dump_parent_path_;
const std::string dump_path_;

int rocksdb_inst_num_ = 0;
/**
* If you want to change the pointer that points to storage,
* you must first acquire a mutex lock.
Expand All @@ -49,9 +48,7 @@ class DB {
*/
std::shared_mutex storage_mutex_;
std::unique_ptr<storage::Storage> storage_;
bool opened_ = false;

std::unique_ptr<CheckpointManager> checkpoint_manager_;
std::atomic_bool opened_ = false;
};

} // namespace pikiwidb
Loading
Loading