Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Snapshot save & load #238

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 3
databases 1

################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -347,7 +347,7 @@ backendpath dump
# the frequency of dump to backend per second
backendhz 10
# the rocksdb number per db
db-instance-num 5
db-instance-num 3
# default 86400 * 7
rocksdb-ttl-second 604800
# default 86400 * 3
Expand Down
19 changes: 19 additions & 0 deletions save_load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
killall -9 pikiwidb
mkdir leader follower1 follower2

cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 &

cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 &
sleep 10
redis-cli -p 7777 raft.cluster init
redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t hset


redis-cli -p 7777 raft.node DOSNAPSHOT
redis-cli -p 7777 raft.node DOSNAPSHOT

redis-cli -p 8888 raft.cluster join 127.0.0.1:7777



45 changes: 0 additions & 45 deletions src/checkpoint_manager.cc

This file was deleted.

37 changes: 0 additions & 37 deletions src/checkpoint_manager.h

This file was deleted.

3 changes: 1 addition & 2 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ enum ClientFlag {
kClientFlagMaster = (1 << 3),
};

class DB;
struct PSlaveInfo;

class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {
Expand Down Expand Up @@ -251,4 +250,4 @@ class PClient : public std::enable_shared_from_this<PClient>, public CmdRes {

static thread_local PClient* s_current;
};
} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 1 addition & 1 deletion src/cmd_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
*/

#include "cmd_admin.h"

#include "braft/raft.h"
#include "rocksdb/version.h"

#include "praft/praft.h"
#include "store.h"

namespace pikiwidb {

Expand Down
17 changes: 14 additions & 3 deletions src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <optional>
#include <string>

#include "net/event_loop.h"
#include "praft/praft.h"
#include "pstd/log.h"
#include "pstd/pstd_string.h"
Expand All @@ -28,8 +29,9 @@ RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity)
bool RaftNodeCmd::DoInitial(PClient* client) {
auto cmd = client->argv_[1];
pstd::StringToUpper(cmd);
if (cmd != kAddCmd && cmd != kRemoveCmd) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only");

if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) {
client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only");
return false;
}
return true;
Expand All @@ -40,8 +42,10 @@ void RaftNodeCmd::DoCmd(PClient* client) {
pstd::StringToUpper(cmd);
if (cmd == kAddCmd) {
DoCmdAdd(client);
} else {
} else if (cmd == kRemoveCmd) {
DoCmdRemove(client);
} else {
DoCmdSnapshot(client);
}
}

Expand Down Expand Up @@ -115,6 +119,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
}
}

void RaftNodeCmd::DoCmdSnapshot(PClient* client) {
auto s = PRAFT.DoSnapshot();
if (s.ok()) {
client->SetRes(CmdRes::kOK);
}
}

RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity)
: BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {}

Expand Down
4 changes: 3 additions & 1 deletion src/cmd_raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ class RaftNodeCmd : public BaseCmd {
void DoCmd(PClient *client) override;
void DoCmdAdd(PClient *client);
void DoCmdRemove(PClient *client);
void DoCmdSnapshot(PClient *client);

static constexpr std::string_view kAddCmd = "ADD";
static constexpr std::string_view kRemoveCmd = "REMOVE";
static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT";
};

/* RAFT.CLUSTER INIT <id>
Expand Down Expand Up @@ -78,4 +80,4 @@ class RaftClusterCmd : public BaseCmd {
static constexpr std::string_view kJoinCmd = "JOIN";
};

} // namespace pikiwidb
} // namespace pikiwidb
2 changes: 2 additions & 0 deletions src/cmd_table_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include "cmd_table_manager.h"

#include <memory>

#include "cmd_admin.h"
#include "cmd_hash.h"
#include "cmd_keys.h"
Expand Down
100 changes: 84 additions & 16 deletions src/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@

#include "db.h"

#include "config.h"
#include "praft/praft.h"
#include "pstd/log.h"

#include "checkpoint_manager.h"
#include "config.h"

extern pikiwidb::PConfig g_config;

namespace pikiwidb {

DB::DB(int db_index, const std::string& db_path)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') {
DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num)
: db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) {
storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = g_config.db_instance_num;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
Expand All @@ -38,24 +36,94 @@ DB::DB(int db_index, const std::string& db_path)
ERROR("Storage open failed! {}", s.ToString());
abort();
}

checkpoint_manager_ = std::make_unique<CheckpointManager>();
checkpoint_manager_->Init(g_config.db_instance_num, this);

opened_ = true;
opened_.store(true);
INFO("Open DB{} success!", db_index_);
}

void DB::DoBgSave(const std::string& path, int i) {
// 1) always hold storage's sharedLock
void DB::DoCheckpoint(const std::string& path, int i) {
// 1) always hold the storage's shared lock
std::shared_lock sharedLock(storage_mutex_);

// 2)Create the storage's checkpoint
// 2)Create the checkpoint of rocksdb i.
auto status = storage_->CreateCheckpoint(path, i);
}

void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); }
void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) {
// 1) Already holding the storage's exclusion lock

void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); }
// 2) Load the checkpoint of rocksdb i.
auto status = storage_->LoadCheckpoint(path, db_path, i);
}

void DB::CreateCheckpoint(const std::string& path, bool sync) {
auto tmp_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::CreatePath(tmp_path)) {
WARN("Create dir {} fail !", tmp_path);
return;
}

std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, create a checkpoint for the specified rocksdb i
// In DB::DoBgSave, a read lock is always held to protect the Storage
// corresponding to this rocksdb i.
auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i);
result.push_back(std::move(res));
}
if (sync) {
for (auto& r : result) {
r.get();
}
}
}

void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) {
opened_.store(false);
auto checkpoint_path = path + '/' + std::to_string(db_index_);
if (0 != pstd::IsDir(path)) {
WARN("Checkpoint dir {} does not exist!", checkpoint_path);
return;
}
if (0 != pstd::IsDir(db_path_)) {
if (0 != pstd::CreateDir(db_path_)) {
WARN("Create dir {} fail !", db_path_);
return;
}
}

std::lock_guard<std::shared_mutex> lock(storage_mutex_);
std::vector<std::future<void>> result;
result.reserve(rocksdb_inst_num_);
for (int i = 0; i < rocksdb_inst_num_; ++i) {
// In a new thread, Load a checkpoint for the specified rocksdb i
auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i);
result.push_back(std::move(res));
}
for (auto& r : result) {
r.get();
}

storage::StorageOptions storage_options;
storage_options.options.create_if_missing = true;
storage_options.db_instance_num = rocksdb_inst_num_;
storage_options.db_id = db_index_;

// options for CF
storage_options.options.ttl = g_config.rocksdb_ttl_second;
storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second;
if (g_config.use_raft) {
storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise<rocksdb::Status>&& promise) {
r.AppendLog(log, std::move(promise));
};
}
storage_ = std::make_unique<storage::Storage>();

if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) {
ERROR("Storage open failed! {}", s.ToString());
abort();
}
opened_.store(true);
INFO("DB{} load a checkpoint from {} success!", db_index_, path);
}
} // namespace pikiwidb
Loading
Loading