diff --git a/pikiwidb.conf b/pikiwidb.conf index 55fe6d3d2..b375faeb0 100644 --- a/pikiwidb.conf +++ b/pikiwidb.conf @@ -38,7 +38,7 @@ logfile stdout # Set the number of databases. The default database is DB 0, you can select # a different one on a per-connection basis using SELECT where # dbid is a number between 0 and 'databases'-1 -databases 3 +databases 1 ################################ SNAPSHOTTING ################################# # @@ -347,7 +347,7 @@ backendpath dump # the frequency of dump to backend per second backendhz 10 # the rocksdb number per db -db-instance-num 5 +db-instance-num 3 # default 86400 * 7 rocksdb-ttl-second 604800 # default 86400 * 3 diff --git a/save_load.sh b/save_load.sh new file mode 100755 index 000000000..b2b6fd836 --- /dev/null +++ b/save_load.sh @@ -0,0 +1,19 @@ +#!/bin/bash +killall -9 pikiwidb +mkdir leader follower1 follower2 + +cd leader && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 7777 & + +cd follower1 && ulimit -n 99999 && rm -fr * && ../bin/pikiwidb ../pikiwidb.conf --port 8888 & +sleep 10 +redis-cli -p 7777 raft.cluster init +redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t hset + + +redis-cli -p 7777 raft.node DOSNAPSHOT +redis-cli -p 7777 raft.node DOSNAPSHOT + +redis-cli -p 8888 raft.cluster join 127.0.0.1:7777 + + + diff --git a/src/checkpoint_manager.cc b/src/checkpoint_manager.cc deleted file mode 100644 index a29211e6c..000000000 --- a/src/checkpoint_manager.cc +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#include "checkpoint_manager.h" -#include "db.h" -#include "log.h" -#include "pstd/env.h" - -namespace pikiwidb { - -void CheckpointManager::Init(int instNum, DB* db) { - checkpoint_num_ = instNum; - res_.reserve(checkpoint_num_); - db_ = db; -} - -void CheckpointManager::CreateCheckpoint(const std::string& path) { - res_.clear(); - - if (!pstd::FileExists(path)) { - if (0 != pstd::CreatePath(path)) { - WARN("Create Dir {} fail!", path); - return; - } - INFO("Create Dir {} success!", path); - } - - std::lock_guard Lock(shared_mutex_); - for (int i = 0; i < checkpoint_num_; ++i) { - auto res = std::async(std::launch::async, &DB::DoBgSave, db_, path, i); - res_.push_back(std::move(res)); - } -} - -void CheckpointManager::WaitForCheckpointDone() { - for (auto& r : res_) { - r.get(); - } -} - -} // namespace pikiwidb diff --git a/src/checkpoint_manager.h b/src/checkpoint_manager.h deleted file mode 100644 index 065027424..000000000 --- a/src/checkpoint_manager.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#pragma once - -#include -#include -#include - -namespace pikiwidb { - -class DB; - -class CheckpointManager { - public: - CheckpointManager() = default; - ~CheckpointManager() = default; - - void Init(int instNum, DB* db); - - void CreateCheckpoint(const std::string& path); - - void WaitForCheckpointDone(); - - private: - int checkpoint_num_ = 0; - std::vector> res_; - DB* db_ = nullptr; - - std::shared_mutex shared_mutex_; -}; - -} // namespace pikiwidb diff --git a/src/client.h b/src/client.h index a890697af..f983b3ef2 100644 --- a/src/client.h +++ b/src/client.h @@ -102,7 +102,6 @@ enum ClientFlag { kClientFlagMaster = (1 << 3), }; -class DB; struct PSlaveInfo; class PClient : public std::enable_shared_from_this, public CmdRes { @@ -251,4 +250,4 @@ class PClient : public std::enable_shared_from_this, public CmdRes { static thread_local PClient* s_current; }; -} // namespace pikiwidb \ No newline at end of file +} // namespace pikiwidb diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 1057bd3fc..d65278728 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -6,11 +6,11 @@ */ #include "cmd_admin.h" - #include "braft/raft.h" #include "rocksdb/version.h" #include "praft/praft.h" +#include "store.h" namespace pikiwidb { diff --git a/src/cmd_raft.cc b/src/cmd_raft.cc index ebbef035e..43f9c6af2 100644 --- a/src/cmd_raft.cc +++ b/src/cmd_raft.cc @@ -11,6 +11,7 @@ #include #include +#include "net/event_loop.h" #include "praft/praft.h" #include "pstd/log.h" #include "pstd/pstd_string.h" @@ -28,8 +29,9 @@ RaftNodeCmd::RaftNodeCmd(const std::string& name, int16_t arity) bool RaftNodeCmd::DoInitial(PClient* client) { auto cmd = client->argv_[1]; pstd::StringToUpper(cmd); - if (cmd != kAddCmd && cmd != kRemoveCmd) { - client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE only"); + + if (cmd != kAddCmd && cmd != kRemoveCmd && cmd != kDoSnapshot) { + client->SetRes(CmdRes::kErrOther, "RAFT.NODE supports ADD / REMOVE / DOSNAPSHOT only"); return false; } return true; @@ -40,8 +42,10 @@ void RaftNodeCmd::DoCmd(PClient* client) { pstd::StringToUpper(cmd); if (cmd == kAddCmd) { DoCmdAdd(client); - } else { + } else if (cmd == kRemoveCmd) { DoCmdRemove(client); + } else { + DoCmdSnapshot(client); } } @@ -115,6 +119,13 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) { } } +void RaftNodeCmd::DoCmdSnapshot(PClient* client) { + auto s = PRAFT.DoSnapshot(); + if (s.ok()) { + client->SetRes(CmdRes::kOK); + } +} + RaftClusterCmd::RaftClusterCmd(const std::string& name, int16_t arity) : BaseCmd(name, arity, kCmdFlagsRaft, kAclCategoryRaft) {} diff --git a/src/cmd_raft.h b/src/cmd_raft.h index b9df47e2c..6a4c1f869 100644 --- a/src/cmd_raft.h +++ b/src/cmd_raft.h @@ -45,9 +45,11 @@ class RaftNodeCmd : public BaseCmd { void DoCmd(PClient *client) override; void DoCmdAdd(PClient *client); void DoCmdRemove(PClient *client); + void DoCmdSnapshot(PClient *client); static constexpr std::string_view kAddCmd = "ADD"; static constexpr std::string_view kRemoveCmd = "REMOVE"; + static constexpr std::string_view kDoSnapshot = "DOSNAPSHOT"; }; /* RAFT.CLUSTER INIT @@ -78,4 +80,4 @@ class RaftClusterCmd : public BaseCmd { static constexpr std::string_view kJoinCmd = "JOIN"; }; -} // namespace pikiwidb \ No newline at end of file +} // namespace pikiwidb diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index edc375596..19bced6dc 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -7,6 +7,8 @@ #include "cmd_table_manager.h" +#include + #include "cmd_admin.h" #include "cmd_hash.h" #include "cmd_keys.h" diff --git a/src/db.cc b/src/db.cc index 122d84b4c..4c002e146 100644 --- a/src/db.cc +++ b/src/db.cc @@ -7,21 +7,19 @@ #include "db.h" +#include "config.h" #include "praft/praft.h" #include "pstd/log.h" -#include "checkpoint_manager.h" -#include "config.h" - extern pikiwidb::PConfig g_config; namespace pikiwidb { -DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') { +DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num) + : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) { storage::StorageOptions storage_options; storage_options.options.create_if_missing = true; - storage_options.db_instance_num = g_config.db_instance_num; + storage_options.db_instance_num = rocksdb_inst_num_; storage_options.db_id = db_index_; // options for CF @@ -38,24 +36,94 @@ DB::DB(int db_index, const std::string& db_path) ERROR("Storage open failed! {}", s.ToString()); abort(); } - - checkpoint_manager_ = std::make_unique(); - checkpoint_manager_->Init(g_config.db_instance_num, this); - - opened_ = true; + opened_.store(true); INFO("Open DB{} success!", db_index_); } -void DB::DoBgSave(const std::string& path, int i) { - // 1) always hold storage's sharedLock +void DB::DoCheckpoint(const std::string& path, int i) { + // 1) always hold the storage's shared lock std::shared_lock sharedLock(storage_mutex_); - // 2)Create the storage's checkpoint 。 + // 2)Create the checkpoint of rocksdb i. auto status = storage_->CreateCheckpoint(path, i); } -void DB::CreateCheckpoint(const std::string& path) { checkpoint_manager_->CreateCheckpoint(path); } +void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) { + // 1) Already holding the storage's exclusion lock -void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); } + // 2) Load the checkpoint of rocksdb i. + auto status = storage_->LoadCheckpoint(path, db_path, i); +} + +void DB::CreateCheckpoint(const std::string& path, bool sync) { + auto tmp_path = path + '/' + std::to_string(db_index_); + if (0 != pstd::CreatePath(tmp_path)) { + WARN("Create dir {} fail !", tmp_path); + return; + } + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, create a checkpoint for the specified rocksdb i + // In DB::DoBgSave, a read lock is always held to protect the Storage + // corresponding to this rocksdb i. + auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i); + result.push_back(std::move(res)); + } + if (sync) { + for (auto& r : result) { + r.get(); + } + } +} + +void DB::LoadDBFromCheckpoint(const std::string& path, bool sync) { + opened_.store(false); + auto checkpoint_path = path + '/' + std::to_string(db_index_); + if (0 != pstd::IsDir(path)) { + WARN("Checkpoint dir {} does not exist!", checkpoint_path); + return; + } + if (0 != pstd::IsDir(db_path_)) { + if (0 != pstd::CreateDir(db_path_)) { + WARN("Create dir {} fail !", db_path_); + return; + } + } + + std::lock_guard lock(storage_mutex_); + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, Load a checkpoint for the specified rocksdb i + auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i); + result.push_back(std::move(res)); + } + for (auto& r : result) { + r.get(); + } + + storage::StorageOptions storage_options; + storage_options.options.create_if_missing = true; + storage_options.db_instance_num = rocksdb_inst_num_; + storage_options.db_id = db_index_; + + // options for CF + storage_options.options.ttl = g_config.rocksdb_ttl_second; + storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second; + if (g_config.use_raft) { + storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + r.AppendLog(log, std::move(promise)); + }; + } + storage_ = std::make_unique(); + + if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { + ERROR("Storage open failed! {}", s.ToString()); + abort(); + } + opened_.store(true); + INFO("DB{} load a checkpoint from {} success!", db_index_, path); +} } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index 11dc3a207..a127e4f16 100644 --- a/src/db.h +++ b/src/db.h @@ -7,15 +7,14 @@ #pragma once +#include #include "storage/storage.h" namespace pikiwidb { -class CheckpointManager; - class DB { public: - DB(int db_index, const std::string& db_path); + DB(int db_index, const std::string& db_path, int rocksdb_inst_num); std::unique_ptr& GetStorage() { return storage_; } @@ -27,20 +26,20 @@ class DB { void UnLockShared() { storage_mutex_.unlock_shared(); } - void CreateCheckpoint(const std::string& path); - - [[maybe_unused]] void DoBgSave(const std::string&, int i); + void CreateCheckpoint(const std::string& path, bool sync); - void WaitForCheckpointDone(); + void LoadDBFromCheckpoint(const std::string& path, bool sync = false); int GetDbIndex() { return db_index_; } + private: + void DoCheckpoint(const std::string&, int i); + void LoadCheckpoint(const std::string&, const std::string& db_path, int i); + private: const int db_index_ = 0; const std::string db_path_; - const std::string dump_parent_path_; - const std::string dump_path_; - + int rocksdb_inst_num_ = 0; /** * If you want to change the pointer that points to storage, * you must first acquire a mutex lock. @@ -49,9 +48,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; - bool opened_ = false; - - std::unique_ptr checkpoint_manager_; + std::atomic_bool opened_ = false; }; } // namespace pikiwidb diff --git a/src/pikiwidb.cc b/src/pikiwidb.cc index 76516df5c..0c9aab727 100644 --- a/src/pikiwidb.cc +++ b/src/pikiwidb.cc @@ -8,6 +8,8 @@ // // PikiwiDB.cc +#include "pikiwidb.h" + #include #include #include @@ -21,7 +23,6 @@ #include "client.h" #include "config.h" #include "helper.h" -#include "pikiwidb.h" #include "pikiwidb_logo.h" #include "slow_log.h" #include "store.h" diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 4eb1e385e..ecb4b002c 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -9,6 +9,7 @@ #include +#include "braft/snapshot.h" #include "braft/util.h" #include "brpc/server.h" @@ -18,6 +19,8 @@ #include "binlog.pb.h" #include "config.h" #include "pikiwidb.h" +#include "praft.h" + #include "praft_service.h" #include "replication.h" #include "store.h" @@ -112,7 +115,6 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { server_.reset(); return ERROR_LOG_AND_STATUS("Failed to start server"); } - // It's ok to start PRaft; assert(group_id.size() == RAFT_GROUPID_LEN); this->group_id_ = group_id; @@ -514,6 +516,16 @@ butil::Status PRaft::RemovePeer(const std::string& peer) { return {0, "OK"}; } +butil::Status PRaft::DoSnapshot() { + if (!node_) { + return ERROR_LOG_AND_STATUS("Node is not initialized"); + } + braft::SynchronizedClosure done; + node_->snapshot(&done); + done.wait(); + return done.status(); +} + void PRaft::OnClusterCmdConnectionFailed([[maybe_unused]] EventLoop* loop, const char* peer_ip, int port) { auto cli = cluster_cmd_ctx_.GetClient(); if (cli) { @@ -567,6 +579,26 @@ void PRaft::AppendLog(const Binlog& log, std::promise&& promise node_->apply(task); } +int PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { + assert(writer); + for (const auto& entry : std::filesystem::directory_iterator(dir)) { + if (entry.is_directory()) { + if (entry.path() != "." && entry.path() != "..") { + DEBUG("dir_path = {}", entry.path().string()); + AddAllFiles(entry.path(), writer, path); + } + } else { + DEBUG("file_path = {}", std::filesystem::relative(entry.path(), path).string()); + if (writer->add_file(std::filesystem::relative(entry.path(), path)) != 0) { + ERROR("add file {} to snapshot fail!", entry.path().string()); + return -1; + } + } + } + return 0; +} + +// @braft::StateMachine void PRaft::Clear() { if (node_) { node_.reset(); @@ -607,9 +639,27 @@ void PRaft::on_apply(braft::Iterator& iter) { } } -void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) {} +void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + assert(writer); + brpc::ClosureGuard done_guard(done); + auto path = writer->get_path(); + INFO("Saving snapshot to {}", path); + TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true}); + PSTORE.HandleTaskSpecificDB(tasks); + if (auto res = AddAllFiles(path, writer, path); res != 0) { + done->status().set_error(EIO, "Fail to add file to writer"); + } +} -int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { return 0; } +int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { + CHECK(!IsLeader()) << "Leader is not supposed to load snapshot"; + assert(reader); + auto reader_path = reader->get_path(); // xx/snapshot_0000001 + auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id + TasksVector tasks(1, {TaskType::kLoadDBFromCheckpoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); + PSTORE.HandleTaskSpecificDB(tasks); + return 0; +} void PRaft::on_leader_start(int64_t term) { WARN("Node {} start to be leader, term={}", node_->node_id().to_string(), term); diff --git a/src/praft/praft.h b/src/praft/praft.h index 05fbded9a..c2ff888be 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -7,7 +7,12 @@ #pragma once +#include #include +#include +#include +#include +#include #include "braft/raft.h" #include "brpc/server.h" @@ -99,6 +104,7 @@ class PRaft : public braft::StateMachine { butil::Status AddPeer(const std::string& peer); butil::Status RemovePeer(const std::string& peer); butil::Status RaftRecvEntry(); + butil::Status DoSnapshot(); void ShutDown(); void Join(); @@ -148,6 +154,9 @@ class PRaft : public braft::StateMachine { void on_stop_following(const ::braft::LeaderChangeContext& ctx) override; void on_start_following(const ::braft::LeaderChangeContext& ctx) override; + private: + static int AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); + private: std::unique_ptr server_{nullptr}; // brpc std::unique_ptr node_{nullptr}; @@ -156,6 +165,7 @@ class PRaft : public braft::StateMachine { ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command std::string group_id_; // group id + int db_id_ = 0; // db_id }; } // namespace pikiwidb diff --git a/src/pstd/pstd_string.cc b/src/pstd/pstd_string.cc index c10e99255..973656a2b 100755 --- a/src/pstd/pstd_string.cc +++ b/src/pstd/pstd_string.cc @@ -619,4 +619,10 @@ bool StringHasSpaces(const std::string& str) { return std::count_if(str.begin(), str.end(), [](unsigned char c) { return std::isspace(c); }); } +void TrimSlash(std::string& dirName) { + while (dirName.back() == '/') { + dirName.pop_back(); + } +} + } // namespace pstd \ No newline at end of file diff --git a/src/pstd/pstd_string.h b/src/pstd/pstd_string.h index ed8411bb4..d6ffd828a 100755 --- a/src/pstd/pstd_string.h +++ b/src/pstd/pstd_string.h @@ -93,4 +93,6 @@ std::string RandomStringWithNumber(size_t len); bool StringHasSpaces(const std::string& str); +void TrimSlash(std::string& dirName); + } // namespace pstd diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index bec856b58..aad21e337 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -46,6 +46,7 @@ inline constexpr size_t BATCH_DELETE_LIMIT = 100; inline constexpr size_t COMPACT_THRESHOLD_COUNT = 2000; inline constexpr uint64_t kNoFlush = std::numeric_limits::max(); +inline constexpr uint64_t kFlush = 0; using Options = rocksdb::Options; using BlockBasedTableOptions = rocksdb::BlockBasedTableOptions; @@ -182,6 +183,8 @@ class Storage { Status CreateCheckpoint(const std::string& dump_path, int index); + Status LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int index); + Status LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key); Status StoreCursorStartKey(const DataType& dtype, int64_t cursor, char type, const std::string& next_key); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index 10febd50a..ed8ee7d3c 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -51,7 +51,12 @@ Redis::~Redis() { for (auto handle : tmp_handles) { delete handle; } + // delete env_; delete db_; + + if (default_compact_range_options_.canceled) { + delete default_compact_range_options_.canceled; + } } Status Redis::Open(const StorageOptions& storage_options, const std::string& db_path) { diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index cccf48a47..fe7a3c943 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include #include @@ -11,6 +12,7 @@ #include "config.h" #include "pstd/log.h" #include "pstd/pikiwidb_slot.h" +#include "pstd/pstd_string.h" #include "rocksdb/utilities/checkpoint.h" #include "scope_snapshot.h" #include "src/lru_cache.h" @@ -23,6 +25,9 @@ #include "storage/storage.h" #include "storage/util.h" +#define PRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" +#define SST_FILE_EXTENSION ".sst" + namespace storage { extern std::string BitOpOperate(BitOpType op, const std::vector& src_values, int64_t max_len); class Redis; @@ -64,13 +69,14 @@ Storage::Storage() { } Storage::~Storage() { - bg_tasks_should_exit_ = true; + bg_tasks_should_exit_.store(true); bg_tasks_cond_var_.notify_one(); - - if (is_opened_) { - for (auto& inst : insts_) { - inst.reset(); + if (is_opened_.load()) { + int ret = 0; + if (ret = pthread_join(bg_tasks_thread_id_, nullptr); ret != 0) { + ERROR("pthread_join failed with bgtask thread error : {}", ret); } + insts_.clear(); } } @@ -82,6 +88,42 @@ static std::string AppendSubDirectory(const std::string& db_path, int index) { } } +static int RecursiveLinkAndCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { + if (std::filesystem::is_regular_file(source)) { + if (source.filename() == PRAFT_SNAPSHOT_META_FILE) { + return 0; + } else if (source.extension() == SST_FILE_EXTENSION) { + // Create a hard link + if (::link(source.c_str(), destination.c_str()) != 0) { + WARN("hard link file {} fail", source.string()); + return -1; + } + DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); + } else { + // Copy the file + if (!std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing)) { + WARN("copy file {} fail", source.string()); + return -1; + } + DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); + } + } else { + if (!pstd::FileExists(destination)) { + if (pstd::CreateDir(destination) != 0) { + WARN("create dir {} fail", destination.string()); + return -1; + } + } + + for (const auto& entry : std::filesystem::directory_iterator(source)) { + if (RecursiveLinkAndCopy(entry.path(), destination / entry.path().filename()) != 0) { + return -1; + } + } + } + return 0; +} + Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) { mkpath(db_path.c_str(), 0755); db_instance_num_ = storage_options.db_instance_num; @@ -105,20 +147,12 @@ Status Storage::Open(const StorageOptions& storage_options, const std::string& d Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { INFO("DB{}'s RocksDB {} begin to generate a checkpoint!", db_id_, i); auto source_dir = AppendSubDirectory(dump_path, db_id_); - if (!pstd::FileExists(source_dir)) { - if (0 != pstd::CreatePath(source_dir)) { - WARN("Create Dir {} fail!", source_dir); - return Status::IOError("CreatePath() fail! dir_name : {} ", source_dir); - } - INFO("Create Dir {} success!", source_dir); - } - source_dir = AppendSubDirectory(source_dir, i); auto tmp_dir = source_dir + ".tmp"; // 1) Make sure the temporary directory does not exist if (!pstd::DeleteDirIfExist(tmp_dir)) { - WARN("DB{}'s RocksDB {} delete dir fail!", db_id_, i); + WARN("DB{}'s RocksDB {} delete directory fail!", db_id_, i); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", tmp_dir); } @@ -133,7 +167,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 3) Create a checkpoint std::unique_ptr checkpoint_guard(checkpoint); - s = checkpoint->CreateCheckpoint(tmp_dir, kNoFlush, nullptr); + s = checkpoint->CreateCheckpoint(tmp_dir, kFlush, nullptr); if (!s.ok()) { WARN("DB{}'s RocksDB {} create checkpoint failed!. Error: {}", db_id_, i, s.ToString()); return s; @@ -141,7 +175,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 4) Make sure the source directory does not exist if (!pstd::DeleteDirIfExist(source_dir)) { - WARN("DB{}'s RocksDB {} delete dir {} fail!", db_id_, i, source_dir); + WARN("DB{}'s RocksDB {} delete directory {} fail!", db_id_, i, source_dir); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", source_dir); } @@ -152,13 +186,48 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { if (!pstd::DeleteDirIfExist(tmp_dir)) { WARN("DB{}'s RocksDB {} fail to delete the rename failed directory {} ", db_id_, i, tmp_dir); } - return Status::IOError("Rename dir {} fail!", tmp_dir); + return Status::IOError("Rename directory {} fail!", tmp_dir); } INFO("DB{}'s RocksDB {} create checkpoint {} success!", db_id_, i, source_dir); return Status::OK(); } +Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) { + auto rocksdb_checkpoint_path = AppendSubDirectory(dump_path, i); + INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}", db_id_, i, rocksdb_checkpoint_path); + auto rocksdb_path = AppendSubDirectory(db_path, i); // ./db/db_id/i + auto tmp_rocksdb_path = rocksdb_path + ".tmp"; // ./db/db_id/i.tmp + insts_[i].reset(); + + // 1) Rename the original db to db.tmp, and only perform the maximum possible recovery of data + // when loading the checkpoint fails. + if (auto status = pstd::RenameFile(rocksdb_path, tmp_rocksdb_path); status != 0) { + WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, + tmp_rocksdb_path); + return Status::IOError("Rename directory {} fail!", db_path); + } + + // 2) Create a db directory to save the checkpoint. + if (0 != pstd::CreatePath(rocksdb_path)) { + pstd::RenameFile(tmp_rocksdb_path, rocksdb_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("Create directory {} fail!", rocksdb_path); + } + if (RecursiveLinkAndCopy(rocksdb_checkpoint_path, rocksdb_path) != 0) { + pstd::DeleteDir(rocksdb_path); + pstd::RenameFile(tmp_rocksdb_path, rocksdb_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("recursive link and copy directory {} fail!", rocksdb_path); + } + + // 3) Destroy the db.tmp directory. + if (auto s = rocksdb::DestroyDB(tmp_rocksdb_path, rocksdb::Options()); !s.ok()) { + WARN("Failure to destroy the old DB, path = {}", tmp_rocksdb_path); + } + return Status::OK(); +} + Status Storage::LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key) { std::string index_key = DataTypeTag[dtype] + std::to_string(cursor); std::string index_value; @@ -1983,9 +2052,9 @@ Status Storage::AddBGTask(const BGTask& bg_task) { Status Storage::RunBGTask() { BGTask task; - while (!bg_tasks_should_exit_) { + while (!bg_tasks_should_exit_.load()) { std::unique_lock lock(bg_tasks_mutex_); - bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_; }); + bg_tasks_cond_var_.wait(lock, [this]() { return !bg_tasks_queue_.empty() || bg_tasks_should_exit_.load(); }); if (!bg_tasks_queue_.empty()) { task = bg_tasks_queue_.front(); @@ -1993,7 +2062,7 @@ Status Storage::RunBGTask() { } lock.unlock(); - if (bg_tasks_should_exit_) { + if (bg_tasks_should_exit_.load()) { return Status::Incomplete("bgtask return with bg_tasks_should_exit true"); } diff --git a/src/store.cc b/src/store.cc index ca739e5b0..e3c9e3560 100644 --- a/src/store.cc +++ b/src/store.cc @@ -7,9 +7,9 @@ #include -#include "checkpoint_manager.h" #include "config.h" #include "log.h" +#include "pstd/pstd_string.h" #include "store.h" namespace pikiwidb { @@ -24,11 +24,11 @@ void PStore::Init() { return; } - dbNum_ = g_config.databases; - backends_.reserve(dbNum_); + db_number_ = g_config.databases; + backends_.reserve(db_number_); if (g_config.backend == kBackEndRocksDB) { - for (int i = 0; i < dbNum_; i++) { - auto db = std::make_unique(i, g_config.dbpath); + for (int i = 0; i < db_number_; i++) { + auto db = std::make_unique(i, g_config.dbpath, g_config.db_instance_num); backends_.push_back(std::move(db)); } } else { @@ -36,41 +36,41 @@ void PStore::Init() { } } -void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { +void PStore::HandleTaskSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { + if (task.db < 0 || task.db >= db_number_) { + WARN("The database index is out of range."); + return; + } + auto& db = backends_.at(task.db); switch (task.type) { case kCheckpoint: { - if (task.db < 0 || task.db >= dbNum_) { - WARN("The database index is out of range."); + if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { + WARN("The critical parameter 'path' is missing for do a checkpoint."); return; } - auto& db = backends_[task.db]; + auto path = task.args.find(kCheckpointPath)->second; + pstd::TrimSlash(path); + db->CreateCheckpoint(path, task.sync); + break; + } + case kLoadDBFromCheckpoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { - WARN("The critical parameter 'path' is missing in the checkpoint."); + WARN("The critical parameter 'path' is missing for load a checkpoint."); return; } auto path = task.args.find(kCheckpointPath)->second; - trimSlash(path); - db->CreateCheckpoint(path); + pstd::TrimSlash(path); + db->LoadDBFromCheckpoint(path, task.sync); + break; + } + case kEmpty: { + WARN("A empty task was passed in, not doing anything."); break; } - default: break; } }); } - -void PStore::WaitForCheckpointDone() { - for (auto& db : backends_) { - db->WaitForCheckpointDone(); - } -} - -void PStore::trimSlash(std::string& dirName) { - while (dirName.back() == '/') { - dirName.pop_back(); - } -} - } // namespace pikiwidb diff --git a/src/store.h b/src/store.h index 7cc26331f..4bf15c5f3 100644 --- a/src/store.h +++ b/src/store.h @@ -10,7 +10,6 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES #include -#include #include #include @@ -18,28 +17,28 @@ namespace pikiwidb { -enum TaskType { - kCheckpoint, -}; +enum TaskType { kCheckpoint = 0, kLoadDBFromCheckpoint, kEmpty }; enum TaskArg { - kCheckpointPath, + kCheckpointPath = 0, }; struct TaskContext { - TaskType type; - int db; + TaskType type = kEmpty; + int db = -1; std::map args; - TaskContext(TaskType t) : type(t) {} - TaskContext(TaskType t, int d) : type(t), db(d) {} - TaskContext(TaskType t, int d, const std::map& a) : type(t), db(d), args(a) {} + bool sync = false; + TaskContext() = delete; + TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} + TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, int d, const std::map& a, bool s = false) + : type(t), db(d), args(a), sync(s) {} }; using TasksVector = std::vector; class PStore { public: - friend class CheckpointManager; static PStore& Instance(); PStore(const PStore&) = delete; @@ -49,24 +48,15 @@ class PStore { std::unique_ptr& GetBackend(int32_t index) { return backends_[index]; }; - void DoSomeThingSpecificDB(const TasksVector& task); - - void WaitForCheckpointDone(); + void HandleTaskSpecificDB(const TasksVector& task); - std::shared_mutex& SharedMutex() { return dbs_mutex_; } + int GetDBNumber() const { return db_number_; } private: PStore() = default; - void trimSlash(std::string& dirName); - int dbNum_ = 0; + int db_number_ = 0; - /** - * If you want to access all the DBs at the same time, - * then you must hold the lock. - * For example: you want to execute flushall or bgsave. - */ - std::shared_mutex dbs_mutex_; std::vector> backends_; };