From c0811dc3961ed881276ee41780d47767a0c575f5 Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Wed, 13 Apr 2022 10:14:12 +0800 Subject: [PATCH] fix disk fault recovery (#4131) * fix disk fault recovery * add ut Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/kvstore/DiskManager.cpp | 4 +- src/kvstore/NebulaStore.cpp | 36 +++++++++--- src/kvstore/NebulaStore.h | 1 + src/kvstore/PartManager.h | 1 + src/kvstore/test/NebulaStoreTest.cpp | 87 ++++++++++++++++++++++++++++ 5 files changed, 121 insertions(+), 8 deletions(-) diff --git a/src/kvstore/DiskManager.cpp b/src/kvstore/DiskManager.cpp index e04039079e8..30e58c5bca6 100644 --- a/src/kvstore/DiskManager.cpp +++ b/src/kvstore/DiskManager.cpp @@ -23,7 +23,9 @@ DiskManager::DiskManager(const std::vector& dataPaths, for (const auto& path : dataPaths) { auto absolute = boost::filesystem::absolute(path); if (!boost::filesystem::exists(absolute)) { - boost::filesystem::create_directories(absolute); + if (!boost::filesystem::create_directories(absolute)) { + LOG(FATAL) << folly::sformat("DataPath:{} does not exist, create failed.", path); + } } else if (!boost::filesystem::is_directory(absolute)) { LOG(FATAL) << "DataPath is not a valid directory: " << path; } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index c5766d582bb..04e6a911428 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -245,7 +245,12 @@ void NebulaStore::loadPartFromDataPath() { folly::RWSpinLock::WriteHolder holder(&lock_); auto iter = spaces_.find(spaceId); CHECK(iter != spaces_.end()); - iter->second->parts_.emplace(partId, part); + // Check if part already exists. + // Prevent the same part from existing on different dataPaths. + auto ret = iter->second->parts_.emplace(partId, part); + if (!ret.second) { + LOG(FATAL) << "Part already exists, partId " << partId; + } } counter.fetch_sub(1); if (counter.load() == 0) { @@ -359,14 +364,31 @@ ErrorOr NebulaStore::partLeader(GraphSpaceID void NebulaStore::addSpace(GraphSpaceID spaceId, bool isListener) { folly::RWSpinLock::WriteHolder wh(&lock_); if (!isListener) { + // Iterate over all engines to ensure that each dataPath has an engine if (this->spaces_.find(spaceId) != this->spaces_.end()) { LOG(INFO) << "Data space " << spaceId << " has existed!"; - return; - } - LOG(INFO) << "Create data space " << spaceId; - this->spaces_[spaceId] = std::make_unique(); - for (auto& path : options_.dataPaths_) { - this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); + for (auto& path : options_.dataPaths_) { + bool engineExist = false; + auto dataPath = folly::stringPrintf("%s/nebula/%d", path.c_str(), spaceId); + for (auto iter = spaces_[spaceId]->engines_.begin(); + iter != spaces_[spaceId]->engines_.end(); + iter++) { + auto dPath = (*iter)->getDataRoot(); + if (dataPath.compare(dPath) == 0) { + engineExist = true; + break; + } + } + if (!engineExist) { + spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); + } + } + } else { + LOG(INFO) << "Create data space " << spaceId; + this->spaces_[spaceId] = std::make_unique(); + for (auto& path : options_.dataPaths_) { + this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_)); + } } } else { // listener don't need engine for now diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 1fadcb3e827..89c12d424cb 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -48,6 +48,7 @@ struct SpaceListenerInfo { */ class NebulaStore : public KVStore, public Handler { FRIEND_TEST(NebulaStoreTest, SimpleTest); + FRIEND_TEST(NebulaStoreTest, MultiPathTest); FRIEND_TEST(NebulaStoreTest, PartsTest); FRIEND_TEST(NebulaStoreTest, PersistPeersTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index f89e143e083..34d3a57befa 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -213,6 +213,7 @@ class PartManager { */ class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, SimpleTest); + FRIEND_TEST(NebulaStoreTest, MultiPathTest); FRIEND_TEST(NebulaStoreTest, PartsTest); FRIEND_TEST(NebulaStoreTest, PersistPeersTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index c8abdcb7171..08fbddd9243 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -133,6 +133,93 @@ TEST(NebulaStoreTest, SimpleTest) { EXPECT_EQ(100, num); } +TEST(NebulaStoreTest, MultiPathTest) { + fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); + + { + auto partMan = std::make_unique(); + auto ioThreadPool = std::make_shared(4); + // GraphSpaceID => {PartitionIDs} + // 1 => {0, 1, 2, 3, 4, 5} + for (auto spaceId = 1; spaceId <= 1; spaceId++) { + for (auto partId = 0; partId < 6; partId++) { + partMan->partsMap_[spaceId][partId] = PartHosts(); + } + } + + VLOG(1) << "Total space num is " << partMan->partsMap_.size() + << ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size(); + + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath.path())); + paths.emplace_back(folly::stringPrintf("%s/disk2", rootPath.path())); + + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + HostAddr local = {"", 0}; + auto store = + std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); + store->init(); + sleep(1); + EXPECT_EQ(1, store->spaces_.size()); + + EXPECT_EQ(6, store->spaces_[1]->parts_.size()); + EXPECT_EQ(2, store->spaces_[1]->engines_.size()); + EXPECT_EQ(folly::stringPrintf("%s/disk1/nebula/1", rootPath.path()), + store->spaces_[1]->engines_[0]->getDataRoot()); + EXPECT_EQ(folly::stringPrintf("%s/disk2/nebula/1", rootPath.path()), + store->spaces_[1]->engines_[1]->getDataRoot()); + + EXPECT_EQ(3, store->spaces_[1]->engines_[0]->allParts().size()); + EXPECT_EQ(3, store->spaces_[1]->engines_[1]->allParts().size()); + + store->stop(); + sleep(10); + } + + // change paths + { + auto partMan = std::make_unique(); + auto ioThreadPool = std::make_shared(4); + // GraphSpaceID => {PartitionIDs} + // 1 => {0, 1, 2, 3, 4, 5} + for (auto spaceId = 1; spaceId <= 1; spaceId++) { + for (auto partId = 0; partId < 6; partId++) { + partMan->partsMap_[spaceId][partId] = PartHosts(); + } + } + + VLOG(1) << "Total space num is " << partMan->partsMap_.size() + << ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size(); + + std::vector paths; + paths.emplace_back(folly::stringPrintf("%s/disk2", rootPath.path())); + paths.emplace_back(folly::stringPrintf("%s/disk3", rootPath.path())); + + KVOptions options; + options.dataPaths_ = std::move(paths); + options.partMan_ = std::move(partMan); + HostAddr local = {"", 0}; + auto store = + std::make_unique(std::move(options), ioThreadPool, local, getHandlers()); + store->init(); + sleep(1); + + EXPECT_EQ(1, store->spaces_.size()); + + EXPECT_EQ(6, store->spaces_[1]->parts_.size()); + EXPECT_EQ(2, store->spaces_[1]->engines_.size()); + EXPECT_EQ(folly::stringPrintf("%s/disk2/nebula/1", rootPath.path()), + store->spaces_[1]->engines_[0]->getDataRoot()); + EXPECT_EQ(folly::stringPrintf("%s/disk3/nebula/1", rootPath.path()), + store->spaces_[1]->engines_[1]->getDataRoot()); + + EXPECT_EQ(3, store->spaces_[1]->engines_[0]->allParts().size()); + EXPECT_EQ(3, store->spaces_[1]->engines_[1]->allParts().size()); + } +} + TEST(NebulaStoreTest, PartsTest) { fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX"); auto ioThreadPool = std::make_shared(4);