Skip to content

Commit

Permalink
fix disk fault recovery (#4131)
Browse files Browse the repository at this point in the history
* fix disk fault recovery

* add ut

Co-authored-by: Sophie <[email protected]>
  • Loading branch information
panda-sheep and Sophie-Xie authored Apr 13, 2022
1 parent 03605d0 commit c0811dc
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/kvstore/DiskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ DiskManager::DiskManager(const std::vector<std::string>& dataPaths,
for (const auto& path : dataPaths) {
auto absolute = boost::filesystem::absolute(path);
if (!boost::filesystem::exists(absolute)) {
boost::filesystem::create_directories(absolute);
if (!boost::filesystem::create_directories(absolute)) {
LOG(FATAL) << folly::sformat("DataPath:{} does not exist, create failed.", path);
}
} else if (!boost::filesystem::is_directory(absolute)) {
LOG(FATAL) << "DataPath is not a valid directory: " << path;
}
Expand Down
36 changes: 29 additions & 7 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ void NebulaStore::loadPartFromDataPath() {
folly::RWSpinLock::WriteHolder holder(&lock_);
auto iter = spaces_.find(spaceId);
CHECK(iter != spaces_.end());
iter->second->parts_.emplace(partId, part);
// Check if part already exists.
// Prevent the same part from existing on different dataPaths.
auto ret = iter->second->parts_.emplace(partId, part);
if (!ret.second) {
LOG(FATAL) << "Part already exists, partId " << partId;
}
}
counter.fetch_sub(1);
if (counter.load() == 0) {
Expand Down Expand Up @@ -359,14 +364,31 @@ ErrorOr<nebula::cpp2::ErrorCode, HostAddr> NebulaStore::partLeader(GraphSpaceID
void NebulaStore::addSpace(GraphSpaceID spaceId, bool isListener) {
folly::RWSpinLock::WriteHolder wh(&lock_);
if (!isListener) {
// Iterate over all engines to ensure that each dataPath has an engine
if (this->spaces_.find(spaceId) != this->spaces_.end()) {
LOG(INFO) << "Data space " << spaceId << " has existed!";
return;
}
LOG(INFO) << "Create data space " << spaceId;
this->spaces_[spaceId] = std::make_unique<SpacePartInfo>();
for (auto& path : options_.dataPaths_) {
this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_));
for (auto& path : options_.dataPaths_) {
bool engineExist = false;
auto dataPath = folly::stringPrintf("%s/nebula/%d", path.c_str(), spaceId);
for (auto iter = spaces_[spaceId]->engines_.begin();
iter != spaces_[spaceId]->engines_.end();
iter++) {
auto dPath = (*iter)->getDataRoot();
if (dataPath.compare(dPath) == 0) {
engineExist = true;
break;
}
}
if (!engineExist) {
spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_));
}
}
} else {
LOG(INFO) << "Create data space " << spaceId;
this->spaces_[spaceId] = std::make_unique<SpacePartInfo>();
for (auto& path : options_.dataPaths_) {
this->spaces_[spaceId]->engines_.emplace_back(newEngine(spaceId, path, options_.walPath_));
}
}
} else {
// listener don't need engine for now
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct SpaceListenerInfo {
*/
class NebulaStore : public KVStore, public Handler {
FRIEND_TEST(NebulaStoreTest, SimpleTest);
FRIEND_TEST(NebulaStoreTest, MultiPathTest);
FRIEND_TEST(NebulaStoreTest, PartsTest);
FRIEND_TEST(NebulaStoreTest, PersistPeersTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest);
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class PartManager {
*/
class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, SimpleTest);
FRIEND_TEST(NebulaStoreTest, MultiPathTest);
FRIEND_TEST(NebulaStoreTest, PartsTest);
FRIEND_TEST(NebulaStoreTest, PersistPeersTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest);
Expand Down
87 changes: 87 additions & 0 deletions src/kvstore/test/NebulaStoreTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,93 @@ TEST(NebulaStoreTest, SimpleTest) {
EXPECT_EQ(100, num);
}

TEST(NebulaStoreTest, MultiPathTest) {
fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX");

{
auto partMan = std::make_unique<MemPartManager>();
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
// GraphSpaceID => {PartitionIDs}
// 1 => {0, 1, 2, 3, 4, 5}
for (auto spaceId = 1; spaceId <= 1; spaceId++) {
for (auto partId = 0; partId < 6; partId++) {
partMan->partsMap_[spaceId][partId] = PartHosts();
}
}

VLOG(1) << "Total space num is " << partMan->partsMap_.size()
<< ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size();

std::vector<std::string> paths;
paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath.path()));
paths.emplace_back(folly::stringPrintf("%s/disk2", rootPath.path()));

KVOptions options;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::move(partMan);
HostAddr local = {"", 0};
auto store =
std::make_unique<NebulaStore>(std::move(options), ioThreadPool, local, getHandlers());
store->init();
sleep(1);
EXPECT_EQ(1, store->spaces_.size());

EXPECT_EQ(6, store->spaces_[1]->parts_.size());
EXPECT_EQ(2, store->spaces_[1]->engines_.size());
EXPECT_EQ(folly::stringPrintf("%s/disk1/nebula/1", rootPath.path()),
store->spaces_[1]->engines_[0]->getDataRoot());
EXPECT_EQ(folly::stringPrintf("%s/disk2/nebula/1", rootPath.path()),
store->spaces_[1]->engines_[1]->getDataRoot());

EXPECT_EQ(3, store->spaces_[1]->engines_[0]->allParts().size());
EXPECT_EQ(3, store->spaces_[1]->engines_[1]->allParts().size());

store->stop();
sleep(10);
}

// change paths
{
auto partMan = std::make_unique<MemPartManager>();
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
// GraphSpaceID => {PartitionIDs}
// 1 => {0, 1, 2, 3, 4, 5}
for (auto spaceId = 1; spaceId <= 1; spaceId++) {
for (auto partId = 0; partId < 6; partId++) {
partMan->partsMap_[spaceId][partId] = PartHosts();
}
}

VLOG(1) << "Total space num is " << partMan->partsMap_.size()
<< ", total local partitions num is " << partMan->parts(HostAddr("", 0)).size();

std::vector<std::string> paths;
paths.emplace_back(folly::stringPrintf("%s/disk2", rootPath.path()));
paths.emplace_back(folly::stringPrintf("%s/disk3", rootPath.path()));

KVOptions options;
options.dataPaths_ = std::move(paths);
options.partMan_ = std::move(partMan);
HostAddr local = {"", 0};
auto store =
std::make_unique<NebulaStore>(std::move(options), ioThreadPool, local, getHandlers());
store->init();
sleep(1);

EXPECT_EQ(1, store->spaces_.size());

EXPECT_EQ(6, store->spaces_[1]->parts_.size());
EXPECT_EQ(2, store->spaces_[1]->engines_.size());
EXPECT_EQ(folly::stringPrintf("%s/disk2/nebula/1", rootPath.path()),
store->spaces_[1]->engines_[0]->getDataRoot());
EXPECT_EQ(folly::stringPrintf("%s/disk3/nebula/1", rootPath.path()),
store->spaces_[1]->engines_[1]->getDataRoot());

EXPECT_EQ(3, store->spaces_[1]->engines_[0]->allParts().size());
EXPECT_EQ(3, store->spaces_[1]->engines_[1]->allParts().size());
}
}

TEST(NebulaStoreTest, PartsTest) {
fs::TempDir rootPath("/tmp/nebula_store_test.XXXXXX");
auto ioThreadPool = std::make_shared<folly::IOThreadPoolExecutor>(4);
Expand Down

0 comments on commit c0811dc

Please sign in to comment.