diff --git a/save_load.sh b/save_load.sh index 4a6ad251b..e4a6bb801 100755 --- a/save_load.sh +++ b/save_load.sh @@ -10,8 +10,8 @@ redis-cli -p 7777 raft.cluster init redis-benchmark -p 7777 -c 5 -n 10000 -r 10000000 -d 1024 -t set -redis-cli -p 7777 raft.node DSS -redis-cli -p 7777 raft.node DSS +redis-cli -p 7777 raft.node DOSNAPSHOT +redis-cli -p 7777 raft.node DOSNAPSHOT redis-cli -p 8888 raft.cluster join 127.0.0.1:7777 diff --git a/src/checkpoint_manager.cc b/src/checkpoint_manager.cc deleted file mode 100644 index 72c39cb0a..000000000 --- a/src/checkpoint_manager.cc +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#include "checkpoint_manager.h" -#include "db.h" -#include "log.h" -#include "pstd/env.h" - -namespace pikiwidb { - -void CheckpointManager::Init(int instNum, DB* db) { - checkpoint_num_ = instNum; - res_.reserve(checkpoint_num_); - db_ = db; -} - -void CheckpointManager::CreateCheckpoint(const std::string& path) { - res_.clear(); - std::lock_guard Lock(shared_mutex_); - for (int i = 0; i < checkpoint_num_; ++i) { - auto res = std::async(std::launch::async, &DB::DoBgSave, db_, path, i); - res_.push_back(std::move(res)); - } -} - -void CheckpointManager::WaitForCheckpointDone() { - for (auto& r : res_) { - r.get(); - } -} - -} // namespace pikiwidb diff --git a/src/checkpoint_manager.h b/src/checkpoint_manager.h deleted file mode 100644 index 065027424..000000000 --- a/src/checkpoint_manager.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. - * This source code is licensed under the BSD-style license found in the - * LICENSE file in the root directory of this source tree. An additional grant - * of patent rights can be found in the PATENTS file in the same directory. - */ - -#pragma once - -#include -#include -#include - -namespace pikiwidb { - -class DB; - -class CheckpointManager { - public: - CheckpointManager() = default; - ~CheckpointManager() = default; - - void Init(int instNum, DB* db); - - void CreateCheckpoint(const std::string& path); - - void WaitForCheckpointDone(); - - private: - int checkpoint_num_ = 0; - std::vector> res_; - DB* db_ = nullptr; - - std::shared_mutex shared_mutex_; -}; - -} // namespace pikiwidb diff --git a/src/db.cc b/src/db.cc index 26decaa26..a56f7dd93 100644 --- a/src/db.cc +++ b/src/db.cc @@ -7,7 +7,6 @@ #include "db.h" -#include "checkpoint_manager.h" #include "config.h" #include "praft/praft.h" #include "pstd/log.h" @@ -16,11 +15,11 @@ extern pikiwidb::PConfig g_config; namespace pikiwidb { -DB::DB(int db_index, const std::string& db_path) - : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/') { +DB::DB(int db_index, const std::string& db_path, int rocksdb_inst_num) + : db_index_(db_index), db_path_(db_path + std::to_string(db_index_) + '/'), rocksdb_inst_num_(rocksdb_inst_num) { storage::StorageOptions storage_options; storage_options.options.create_if_missing = true; - storage_options.db_instance_num = g_config.db_instance_num; + storage_options.db_instance_num = rocksdb_inst_num_; storage_options.db_id = db_index_; // options for CF @@ -37,31 +36,97 @@ DB::DB(int db_index, const std::string& db_path) ERROR("Storage open failed! {}", s.ToString()); abort(); } - - checkpoint_manager_ = std::make_unique(); - checkpoint_manager_->Init(g_config.db_instance_num, this); - opened_ = true; INFO("Open DB{} success!", db_index_); } -void DB::DoBgSave(const std::string& path, int i) { +void DB::DoCheckpoint(const std::string& path, int i) { // 1) always hold storage's sharedLock std::shared_lock sharedLock(storage_mutex_); - // 2)Create the storage's checkpoint 。 + // 2)Create the checkpoint of rocksdb i. auto status = storage_->CreateCheckpoint(path, i); } -void DB::CreateCheckpoint(const std::string& path) { +void DB::LoadCheckpoint(const std::string& path, const std::string& db_path, int i) { + // 1) Already holding the mutual exclusion lock + + // 2) Load the checkpoint of rocksdb i. + auto status = storage_->LoadCheckpoint(path, db_path, i); +} + +void DB::CreateCheckpoint(const std::string& path, bool sync) { auto tmp_path = path + '/' + std::to_string(db_index_); if (0 != pstd::CreatePath(tmp_path)) { WARN("Create dir {} fail !", tmp_path); return; } - checkpoint_manager_->CreateCheckpoint(path); + + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, create a checkpoint for the specified rocksdb i + // In DB::DoBgSave, a read lock is always held to protect the Storage + // corresponding to this rocksdb i. + auto res = std::async(std::launch::async, &DB::DoCheckpoint, this, path, i); + result.push_back(std::move(res)); + } + if (sync) { + for (auto& r : result) { + r.get(); + } + } } -void DB::WaitForCheckpointDone() { checkpoint_manager_->WaitForCheckpointDone(); } +void DB::LoadDBFromCheckPoint(const std::string& path, bool sync) { + opened_.store(false); + // 对于每一个 rocksdb 分别去 Load 自己的 DB. + auto checkpoint_path = path + '/' + std::to_string(db_index_); + if (0 != pstd::IsDir(path)) { + WARN("Checkpoint dir {} does not exist!", checkpoint_path); + return; + } + if (0 != pstd::IsDir(db_path_)) { + if (0 != pstd::CreateDir(db_path_)) { + WARN("Create dir {} fail !", db_path_); + return; + } + } + + std::lock_guard lock(storage_mutex_); + std::vector> result; + result.reserve(rocksdb_inst_num_); + for (int i = 0; i < rocksdb_inst_num_; ++i) { + // In a new thread, Load a checkpoint for the specified rocksdb i + // In DB::DoBgSave, a read lock is always held to protect the Storage + // corresponding to this rocksdb i. + auto res = std::async(std::launch::async, &DB::LoadCheckpoint, this, checkpoint_path, db_path_, i); + result.push_back(std::move(res)); + } + for (auto& r : result) { + r.get(); + } + // 重新启动 + storage::StorageOptions storage_options; + storage_options.options.create_if_missing = true; + storage_options.db_instance_num = rocksdb_inst_num_; + storage_options.db_id = db_index_; + + // options for CF + storage_options.options.ttl = g_config.rocksdb_ttl_second; + storage_options.options.periodic_compaction_seconds = g_config.rocksdb_periodic_second; + if (g_config.use_raft) { + storage_options.append_log_function = [&r = PRAFT](const Binlog& log, std::promise&& promise) { + r.AppendLog(log, std::move(promise)); + }; + } + storage_ = std::make_unique(); + if (auto s = storage_->Open(storage_options, db_path_); !s.ok()) { + ERROR("Storage open failed! {}", s.ToString()); + abort(); + } + opened_.store(true); + INFO("DB{} load a checkpoint from {} success!", db_index_, path); +} } // namespace pikiwidb diff --git a/src/db.h b/src/db.h index 11dc3a207..2794ac440 100644 --- a/src/db.h +++ b/src/db.h @@ -7,15 +7,14 @@ #pragma once +#include #include "storage/storage.h" namespace pikiwidb { -class CheckpointManager; - class DB { public: - DB(int db_index, const std::string& db_path); + DB(int db_index, const std::string& db_path, int rocksdb_inst_num); std::unique_ptr& GetStorage() { return storage_; } @@ -27,20 +26,20 @@ class DB { void UnLockShared() { storage_mutex_.unlock_shared(); } - void CreateCheckpoint(const std::string& path); - - [[maybe_unused]] void DoBgSave(const std::string&, int i); + void CreateCheckpoint(const std::string& path, bool sync); - void WaitForCheckpointDone(); + void LoadDBFromCheckPoint(const std::string& path, bool sync = false); int GetDbIndex() { return db_index_; } + private: + void DoCheckpoint(const std::string&, int i); + void LoadCheckpoint(const std::string&, const std::string& db_path, int i); + private: const int db_index_ = 0; const std::string db_path_; - const std::string dump_parent_path_; - const std::string dump_path_; - + int rocksdb_inst_num_ = 0; /** * If you want to change the pointer that points to storage, * you must first acquire a mutex lock. @@ -49,9 +48,7 @@ class DB { */ std::shared_mutex storage_mutex_; std::unique_ptr storage_; - bool opened_ = false; - - std::unique_ptr checkpoint_manager_; + std::atomic_bool opened_ = false; }; } // namespace pikiwidb diff --git a/src/praft/praft.cc b/src/praft/praft.cc index 0dbecbab1..4489beae3 100644 --- a/src/praft/praft.cc +++ b/src/praft/praft.cc @@ -115,7 +115,6 @@ butil::Status PRaft::Init(std::string& group_id, bool initial_conf_is_null) { server_.reset(); return ERROR_LOG_AND_STATUS("Failed to start server"); } - // It's ok to start PRaft; assert(group_id.size() == RAFT_GROUPID_LEN); this->group_id_ = group_id; @@ -580,7 +579,7 @@ void PRaft::AppendLog(const Binlog& log, std::promise&& promise node_->apply(task); } -void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { +int PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path) { assert(writer); for (const auto& entry : std::filesystem::directory_iterator(dir)) { if (entry.is_directory()) { @@ -591,34 +590,12 @@ void PRaft::AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* } else { DEBUG("file_path = {}", std::filesystem::relative(entry.path(), path).string()); if (writer->add_file(std::filesystem::relative(entry.path(), path)) != 0) { - ERROR("add file error!"); + ERROR("add file {} to snapshot fail!", entry.path().string()); + return -1; } } } -} - -void PRaft::RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { - if (std::filesystem::is_regular_file(source)) { - if (source.filename() == PBRAFT_SNAPSHOT_META_FILE) { - return; - } else if (source.extension() == ".sst") { - // Create a hard link - DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); - ::link(source.c_str(), destination.c_str()); - } else { - // Copy the file - DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); - std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing); - } - } else { - if (!pstd::FileExists(destination)) { - pstd::CreateDir(destination); - } - - for (const auto& entry : std::filesystem::directory_iterator(source)) { - RecursiveCopy(entry.path(), destination / entry.path().filename()); - } - } + return 0; } // @braft::StateMachine @@ -663,30 +640,24 @@ void PRaft::on_apply(braft::Iterator& iter) { } void PRaft::on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) { + assert(writer); brpc::ClosureGuard done_guard(done); - TasksVector tasks; - tasks.reserve(g_config.databases); - for (auto i = 0; i < g_config.databases; ++i) { - tasks.push_back({TaskType::kCheckpoint, i, {{TaskArg::kCheckpointPath, writer->get_path()}}}); - } + auto path = writer->get_path(); + INFO("Saving snapshot to {}", path); + TasksVector tasks(1, {TaskType::kCheckpoint, db_id_, {{TaskArg::kCheckpointPath, path}}, true}); PSTORE.DoSomeThingSpecificDB(tasks); - PSTORE.WaitForCheckpointDone(); - auto writer_path = writer->get_path(); - AddAllFiles(writer_path, writer, writer_path); + if (auto res = AddAllFiles(path, writer, path); res != 0) { + done->status().set_error(EIO, "Fail to add file to writer"); + } } int PRaft::on_snapshot_load(braft::SnapshotReader* reader) { CHECK(!IsLeader()) << "Leader is not supposed to load snapshot"; - auto reader_path = reader->get_path(); // xx/snapshot_0000001 - auto db_path = g_config.dbpath; - PSTORE.Clear(); - for (int i = 0; i < g_config.databases; i++) { - auto sub_path = db_path + std::to_string(i); - pstd::DeleteDirIfExist(sub_path); - } - db_path.pop_back(); - RecursiveCopy(reader_path, db_path); - PSTORE.Init(); + assert(reader); + auto reader_path = reader->get_path(); // xx/snapshot_0000001 + auto path = g_config.dbpath + std::to_string(db_id_); // db/db_id + TasksVector tasks(1, {TaskType::kLoadDBFromCheckPoint, db_id_, {{TaskArg::kCheckpointPath, reader_path}}, true}); + PSTORE.DoSomeThingSpecificDB(tasks); return 0; } diff --git a/src/praft/praft.h b/src/praft/praft.h index 6eeb8d4b6..c2ff888be 100644 --- a/src/praft/praft.h +++ b/src/praft/praft.h @@ -22,7 +22,6 @@ namespace pikiwidb { -#define PBRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" #define RAFT_GROUPID_LEN 32 #define OK "+OK" @@ -156,9 +155,7 @@ class PRaft : public braft::StateMachine { void on_start_following(const ::braft::LeaderChangeContext& ctx) override; private: - void AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); - - void RecursiveCopy(const std::filesystem::path& source, const std::filesystem::path& destination); + static int AddAllFiles(const std::filesystem::path& dir, braft::SnapshotWriter* writer, const std::string& path); private: std::unique_ptr server_{nullptr}; // brpc @@ -168,6 +165,7 @@ class PRaft : public braft::StateMachine { ClusterCmdContext cluster_cmd_ctx_; // context for cluster join/remove command std::string group_id_; // group id + int db_id_ = 0; // db_id }; } // namespace pikiwidb diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 35ffb4e21..aad21e337 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -183,6 +183,8 @@ class Storage { Status CreateCheckpoint(const std::string& dump_path, int index); + Status LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int index); + Status LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key); Status StoreCursorStartKey(const DataType& dtype, int64_t cursor, char type, const std::string& next_key); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 222045a13..ff2881cb0 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include #include @@ -23,6 +24,9 @@ #include "storage/storage.h" #include "storage/util.h" +#define PRAFT_SNAPSHOT_META_FILE "__raft_snapshot_meta" +#define SST_FILE_EXTENSION ".sst" + namespace storage { extern std::string BitOpOperate(BitOpType op, const std::vector& src_values, int64_t max_len); class Redis; @@ -82,6 +86,42 @@ static std::string AppendSubDirectory(const std::string& db_path, int index) { } } +static int RecursiveLinkAndCopy(const std::filesystem::path& source, const std::filesystem::path& destination) { + if (std::filesystem::is_regular_file(source)) { + if (source.filename() == PRAFT_SNAPSHOT_META_FILE) { + return 0; + } else if (source.extension() == SST_FILE_EXTENSION) { + // Create a hard link + DEBUG("hard link success! source_file = {} , destination_file = {}", source.string(), destination.string()); + if (::link(source.c_str(), destination.c_str()) < 0) { + WARN("hard link file {} fail", source.string()); + return -1; + } + } else { + // Copy the file + DEBUG("copy success! source_file = {} , destination_file = {}", source.string(), destination.string()); + if (!std::filesystem::copy_file(source, destination, std::filesystem::copy_options::overwrite_existing)) { + WARN("copy file {} fail", source.string()); + return -1; + } + } + } else { + if (!pstd::FileExists(destination)) { + if (pstd::CreateDir(destination) != 0) { + WARN("create dir {} fail", destination.string()); + return -1; + } + } + + for (const auto& entry : std::filesystem::directory_iterator(source)) { + if (RecursiveLinkAndCopy(entry.path(), destination / entry.path().filename()) != 0) { + return -1; + } + } + } + return 0; +} + Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) { mkpath(db_path.c_str(), 0755); db_instance_num_ = storage_options.db_instance_num; @@ -110,7 +150,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { auto tmp_dir = source_dir + ".tmp"; // 1) Make sure the temporary directory does not exist if (!pstd::DeleteDirIfExist(tmp_dir)) { - WARN("DB{}'s RocksDB {} delete dir fail!", db_id_, i); + WARN("DB{}'s RocksDB {} delete directory fail!", db_id_, i); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", tmp_dir); } @@ -133,7 +173,7 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { // 4) Make sure the source directory does not exist if (!pstd::DeleteDirIfExist(source_dir)) { - WARN("DB{}'s RocksDB {} delete dir {} fail!", db_id_, i, source_dir); + WARN("DB{}'s RocksDB {} delete directory {} fail!", db_id_, i, source_dir); return Status::IOError("DeleteDirIfExist() fail! dir_name : {} ", source_dir); } @@ -144,13 +184,45 @@ Status Storage::CreateCheckpoint(const std::string& dump_path, int i) { if (!pstd::DeleteDirIfExist(tmp_dir)) { WARN("DB{}'s RocksDB {} fail to delete the rename failed directory {} ", db_id_, i, tmp_dir); } - return Status::IOError("Rename dir {} fail!", tmp_dir); + return Status::IOError("Rename directory {} fail!", tmp_dir); } INFO("DB{}'s RocksDB {} create checkpoint {} success!", db_id_, i, source_dir); return Status::OK(); } +Status Storage::LoadCheckpoint(const std::string& dump_path, const std::string& db_path, int i) { + auto rocksdb_checkpoint_path = AppendSubDirectory(db_path, i); + INFO("DB{}'s RocksDB {} begin to load a checkpoint from {}!", db_id_, i, rocksdb_checkpoint_path); + + // 首先将原来的 db path 改名, 当 load 失败的时候保证原来的数据还在. + auto tmp_path = db_path + ".tmp"; + if (auto status = pstd::RenameFile(db_path, tmp_path); status != 0) { + WARN("DB{}'s RocksDB {} rename db directory {} to temporary directory {} fail!", db_id_, i, db_path, tmp_path); + return Status::IOError("Rename directory {} fail!", db_path); + } + + if (0 != pstd::CreateDir(db_path)) { + pstd::RenameFile(tmp_path, db_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("Create directory {} fail!", db_path); + } + + // 将原来的数据拷贝到 DB 目录下. + if (RecursiveLinkAndCopy(dump_path, db_path) != 0) { + pstd::DeleteDir(db_path); + pstd::RenameFile(tmp_path, db_path); + WARN("DB{}'s RocksDB {} load a checkpoint from {} fail!", db_id_, i, rocksdb_checkpoint_path); + return Status::IOError("recursive link and copy directory {} fail!", dump_path); + } + + // 删除掉 tmp + if (auto s = rocksdb::DestroyDB(tmp_path, rocksdb::Options()); !s.ok()) { + WARN("Failure to destroy the old DB, path = {}", tmp_path); + } + return Status::OK(); +} + Status Storage::LoadCursorStartKey(const DataType& dtype, int64_t cursor, char* type, std::string* start_key) { std::string index_key = DataTypeTag[dtype] + std::to_string(cursor); std::string index_value; diff --git a/src/store.cc b/src/store.cc index 7b3c59a51..c67da7127 100644 --- a/src/store.cc +++ b/src/store.cc @@ -7,7 +7,6 @@ #include -#include "checkpoint_manager.h" #include "config.h" #include "log.h" #include "store.h" @@ -28,7 +27,7 @@ void PStore::Init() { backends_.reserve(dbNum_); if (g_config.backend == kBackEndRocksDB) { for (int i = 0; i < dbNum_; i++) { - auto db = std::make_unique(i, g_config.dbpath); + auto db = std::make_unique(i, g_config.dbpath, g_config.db_instance_num); backends_.push_back(std::move(db)); } } else { @@ -36,39 +35,45 @@ void PStore::Init() { } } -void PStore::Clear() { backends_.clear(); } +void PStore::Clear() { + std::lock_guard lock(dbs_mutex_); + backends_.clear(); +} void PStore::DoSomeThingSpecificDB(const TasksVector& tasks) { std::for_each(tasks.begin(), tasks.end(), [this](const auto& task) { + if (task.db < 0 || task.db >= dbNum_) { + WARN("The database index is out of range."); + return; + } + auto& db = backends_[task.db]; switch (task.type) { case kCheckpoint: { - if (task.db < 0 || task.db >= dbNum_) { - WARN("The database index is out of range."); + if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { + WARN("The critical parameter 'path' is missing for do a checkpoint."); return; } - auto& db = backends_[task.db]; + auto path = task.args.find(kCheckpointPath)->second; + trimSlash(path); + db->CreateCheckpoint(path, task.sync); + break; + } + case kLoadDBFromCheckPoint: { if (auto s = task.args.find(kCheckpointPath); s == task.args.end()) { - WARN("The critical parameter 'path' is missing in the checkpoint."); + WARN("The critical parameter 'path' is missing for load a checkpoint."); return; } auto path = task.args.find(kCheckpointPath)->second; trimSlash(path); - db->CreateCheckpoint(path); + db->LoadDBFromCheckPoint(path, task.sync); break; } - default: break; } }); } -void PStore::WaitForCheckpointDone() { - for (auto& db : backends_) { - db->WaitForCheckpointDone(); - } -} - void PStore::trimSlash(std::string& dirName) { while (dirName.back() == '/') { dirName.pop_back(); diff --git a/src/store.h b/src/store.h index 3c60aee09..2fd207474 100644 --- a/src/store.h +++ b/src/store.h @@ -17,21 +17,21 @@ namespace pikiwidb { -enum TaskType { - kCheckpoint, -}; +enum TaskType { kCheckpoint = 0, kLoadDBFromCheckPoint }; enum TaskArg { - kCheckpointPath, + kCheckpointPath = 0, }; struct TaskContext { TaskType type; int db; std::map args; - TaskContext(TaskType t) : type(t) {} - TaskContext(TaskType t, int d) : type(t), db(d) {} - TaskContext(TaskType t, int d, const std::map& a) : type(t), db(d), args(a) {} + bool sync; + TaskContext(TaskType t, bool s = false) : type(t), sync(s) {} + TaskContext(TaskType t, int d, bool s = false) : type(t), db(d), sync(s) {} + TaskContext(TaskType t, int d, const std::map& a, bool s = false) + : type(t), db(d), args(a), sync(s) {} }; using TasksVector = std::vector;