Skip to content

Commit

Permalink
feat: Snapshot save & load (#238)
Browse files Browse the repository at this point in the history
snapshot save & load
  • Loading branch information
dingxiaoshuai123 authored Apr 22, 2024
1 parent 273a2d9 commit 3411495
Show file tree
Hide file tree
Showing 21 changed files with 345 additions and 193 deletions.
4 changes: 2 additions & 2 deletions pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 3
databases 1

################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -347,7 +347,7 @@ backendpath dump
# the frequency of dump to backend per second
backendhz 10
# the rocksdb number per db
db-instance-num 5
db-instance-num 3
# default 86400 * 7
rocksdb-ttl-second 604800
# default 86400 * 3
Expand Down
19 changes: 19 additions & 0 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
killall -9 pikiwidb
mkdir leader follower1 follower2

cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 &

cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 &
sleep 10
redis-cli -p 7777 raft.cluster init
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t hset


redis-cli -p 7777 raft.node DOSNAPSHOT
redis-cli -p 7777 raft.node DOSNAPSHOT

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777



45 changes: 0 additions & 45 deletions src/checkpoint_manager.cc

This file was deleted.

37 changes: 0 additions & 37 deletions src/checkpoint_manager.h

This file was deleted.

3 changes: 1 addition & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ enum ClientFlag {
kClientFlagMaster = (1 << 3),
};

class DB;
struct PSlaveInfo;

class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
Expand Down Expand Up @@ -251,4 +250,4 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

static thread_local PClient* s_current;
};
} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 1 addition & 1 deletion src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
*/

#include "cmd_admin.h"

#include "braft/raft.h"
#include "rocksdb/version.h"

#include "praft/praft.h"
#include "store.h"

namespace pikiwidb {

Expand Down
17 changes: 14 additions & 3 deletions src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <optional>
#include <string>

#include "net/event_loop.h"
#include "praft/praft.h"
#include "pstd/log.h"
#include "pstd/pstd_string.h"
Expand All @@ -28,8 +29,9 @@ RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity)
bool RaftNodeCmd::DoInitial(PClient* client) {
auto cmd = client->argv_[1];
pstd::StringToUpper(cmd);
if (cmd != kAddCmd && cmd != kRemoveCmd) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only");

if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only");
return false;
}
return true;
Expand All @@ -40,8 +42,10 @@ void RaftNodeCmd::DoCmd(PClient* client) {
pstd::StringToUpper(cmd);
if (cmd == kAddCmd) {
DoCmdAdd(client);
} else {
} else if (cmd == kRemoveCmd) {
DoCmdRemove(client);
} else {
DoCmdSnapshot(client);
}
}

Expand Down Expand Up @@ -115,6 +119,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
}

RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {}

Expand Down
4 changes: 3 additions & 1 deletion src/cmd_raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ class RaftNodeCmd : public BaseCmd {
void DoCmd(PClient *client) override;
void DoCmdAdd(PClient *client);
void DoCmdRemove(PClient *client);
void DoCmdSnapshot(PClient *client);

static constexpr std::string_view kAddCmd = "ADD";
static constexpr std::string_view kRemoveCmd = "REMOVE";
static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT";
};

/* RAFT.CLUSTER INIT <id>
Expand Down Expand Up @@ -78,4 +80,4 @@ class RaftClusterCmd : public BaseCmd {
static constexpr std::string_view kJoinCmd = "JOIN";
};

} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "cmd_table_manager.h"

#include <memory>

#include "cmd_admin.h"
#include "cmd_hash.h"
#include "cmd_keys.h"
Expand Down
100 changes: 84 additions & 16 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@

#include "db.h"

#include "config.h"
#include "praft/praft.h"
#include "pstd/log.h"

#include "checkpoint_manager.h"
#include "config.h"

extern pikiwidb::PConfig g_config;

namespace pikiwidb {

DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {
DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) {
storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = g_config.db_instance_num;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -38,24 +36,94 @@ DB::DB(int db_index, const std::string& db_path)
ERROR("Storage open failed! {}", s.ToString());
abort();
}

checkpoint_manager_ = std::make_unique<CheckpointManager>();
checkpoint_manager_->Init(g_config.db_instance_num, this);

opened_ = true;
opened_.store(true);
INFO("Open DB{} success!", db_index_);
}

void DB::DoBgSave(const std::string& path, int i) {
// 1) always hold storage's sharedLock
void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold the storage's shared lock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the storage's checkpoint
// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); }
void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the storage's exclusion lock

void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); }
// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
}

void DB::CreateCheckpoint(const std::string& path, bool sync) {
auto tmp_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(tmp_path)) {
WARN("Create dir {} fail !", tmp_path);
return;
}

std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i);
result.push_back(std::move(res));
}
if (sync) {
for (auto& r : result) {
r.get();
}
}
}

void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) {
opened_.store(false);
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
return;
}
if (0 != pstd::IsDir(db_path_)) {
if (0 != pstd::CreateDir(db_path_)) {
WARN("Create dir {} fail !", db_path_);
return;
}
}

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
for (auto& r : result) {
r.get();
}

storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
storage_options.options.ttl = g_config.rocksdb_ttl_second;
storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second;
if (g_config.use_raft) {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
abort();
}
opened_.store(true);
INFO("DB{} load a checkpoint from {} success!", db_index_, path);
}
} // namespace pikiwidb
Loading

0 comments on commit 3411495

Please sign in to comment.