From c21cad48243ebbdbc2ca98253ea50569352c7391 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 18:29:33 +0800 Subject: [PATCH 1/3] use rcu replace thread local fix storage exit crash format address some comment --- src/clients/meta/MetaClient.cpp | 463 +++++++++++++++-------------- src/clients/meta/MetaClient.h | 16 +- src/storage/test/ChainTestUtils.h | 6 +- src/storage/test/KillQueryTest.cpp | 2 +- 4 files changed, 255 insertions(+), 232 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 3d08c9cfe18..9d6232c3730 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -64,14 +64,15 @@ DEFINE_validator(failed_login_attempts, &ValidateFailedLoginAttempts); namespace nebula { namespace meta { +Indexes buildIndexes(std::vector indexItemVec); + MetaClient::MetaClient(std::shared_ptr ioThreadPool, std::vector addrs, const MetaClientOptions& options) : ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options), - sessionMap_(new SessionMap{}), - killedPlans_(new folly::F14FastSet>{}) { + metadata_(new MetaData()) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified or can be solved. Meta server is required"; @@ -87,8 +88,7 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { stop(); - delete sessionMap_.load(); - delete killedPlans_.load(); + delete metadata_.load(); VLOG(3) << "~MetaClient"; } @@ -188,35 +188,33 @@ bool MetaClient::loadUsersAndRoles() { userPasswordMap[user.first] = user.second; userNameList.emplace(user.first); } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - userRolesMap_ = std::move(userRolesMap); - userPasswordMap_ = std::move(userPasswordMap); - - // Remove expired users from cache - auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, - const std::unordered_set& userList) { - for (auto& ele : userMap) { - if (userList.count(ele.first) == 0) { - userMap.erase(ele.first); - } - } - }; - removeExpiredUser(userPasswordAttemptsRemain_, userNameList); - removeExpiredUser(userLoginLockTime_, userNameList); - - // This method is called periodically by the heartbeat thread, but we don't want to reset the - // failed login attempts every time. - for (const auto& user : userNameList) { - // If the user is not in the map, insert value with the default value - // Do nothing if the account is already in the map - if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { - userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); - } - if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { - userLoginLockTime_.insert(user, 0); + + userRolesMap_ = std::move(userRolesMap); + userPasswordMap_ = std::move(userPasswordMap); + + // Remove expired users from cache + auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, + const std::unordered_set& userList) { + for (auto& ele : userMap) { + if (userList.count(ele.first) == 0) { + userMap.erase(ele.first); } } + }; + removeExpiredUser(userPasswordAttemptsRemain_, userNameList); + removeExpiredUser(userLoginLockTime_, userNameList); + + // This method is called periodically by the heartbeat thread, but we don't want to reset the + // failed login attempts every time. + for (const auto& user : userNameList) { + // If the user is not in the map, insert value with the default value + // Do nothing if the account is already in the map + if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { + userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); + } + if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { + userLoginLockTime_.insert(user, 0); + } } return true; } @@ -338,7 +336,6 @@ bool MetaClient::loadData() { decltype(localCache_) oldCache; { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); oldCache = std::move(localCache_); localCache_ = std::move(cache); spaceIndexByName_ = std::move(spaceIndexByName); @@ -353,7 +350,37 @@ bool MetaClient::loadData() { } localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); - + auto newMetaData = new MetaData(); + + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + newMetaData->localCache_[spaceId] = infoDeepCopy; + } + newMetaData->spaceIndexByName_ = spaceIndexByName_; + newMetaData->spaceTagIndexByName_ = spaceTagIndexByName_; + newMetaData->spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + newMetaData->spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + newMetaData->spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + newMetaData->spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + newMetaData->spaceTagIndexById_ = spaceTagIndexById_; + newMetaData->spaceAllEdgeMap_ = spaceAllEdgeMap_; + + newMetaData->userRolesMap_ = userRolesMap_; + newMetaData->storageHosts_ = storageHosts_; + newMetaData->fulltextIndexMap_ = fulltextIndexMap_; + newMetaData->userPasswordMap_ = userPasswordMap_; + newMetaData->sessionMap_ = std::move(sessionMap_); + newMetaData->killedPlans_ = std::move(killedPlans_); + newMetaData->fulltextClientList_ = std::move(fulltextClientList_); + auto oldMetaData = metadata_.load(); + metadata_.store(newMetaData); + folly::rcu_retire(oldMetaData); diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); @@ -507,7 +534,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, return true; } -static Indexes buildIndexes(std::vector indexItemVec) { +Indexes buildIndexes(std::vector indexItemVec) { Indexes indexes; for (auto index : indexItemVec) { auto indexName = index.get_index_name(); @@ -576,10 +603,7 @@ bool MetaClient::loadGlobalServiceClients() { LOG(ERROR) << "List services failed, status:" << ret.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - serviceClientList_ = std::move(ret).value(); - } + serviceClientList_ = std::move(ret).value(); return true; } @@ -589,53 +613,15 @@ bool MetaClient::loadFulltextIndexes() { LOG(ERROR) << "List fulltext indexes failed, status:" << ftRet.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - fulltextIndexMap_ = std::move(ftRet).value(); - } + fulltextIndexMap_ = std::move(ftRet).value(); return true; } -const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { - ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); - - if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { - threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto& spaceInfo : localCache_) { - GraphSpaceID spaceId = spaceInfo.first; - std::shared_ptr info = spaceInfo.second; - std::shared_ptr infoDeepCopy = std::make_shared(*info); - infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->edgeSchemas_ = - buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); - infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); - threadLocalInfo.localCache_[spaceId] = infoDeepCopy; - } - threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; - threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; - threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; - threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; - threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; - threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; - threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; - threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; - - threadLocalInfo.userRolesMap_ = userRolesMap_; - threadLocalInfo.storageHosts_ = storageHosts_; - threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; - threadLocalInfo.userPasswordMap_ = userPasswordMap_; - } - - return threadLocalInfo; -} - Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -647,9 +633,10 @@ Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(space); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(space); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -1329,9 +1316,10 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceIndexByName_.find(name); - if (it != threadLocalInfo.spaceIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceIndexByName_.find(name); + if (it != metadata.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1341,9 +1329,10 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1355,9 +1344,10 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceTagIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1368,9 +1358,10 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceTagIndexById_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1381,9 +1372,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1394,9 +1386,10 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1406,9 +1399,10 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); - if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceAllEdgeMap_.find(space); + if (it == metadata.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1543,14 +1537,16 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetPartsMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetPartsMap(host, metadata.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1568,9 +1564,10 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1587,9 +1584,10 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1601,9 +1599,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa } StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -1994,9 +1993,10 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2012,9 +2012,10 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2033,9 +2034,10 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -2056,9 +2058,10 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -2076,9 +2079,10 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -2095,9 +2099,10 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -2107,9 +2112,10 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -2125,9 +2131,10 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -2137,9 +2144,10 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2227,9 +2235,10 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2264,9 +2273,10 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2301,9 +2311,10 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2324,9 +2335,10 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2400,9 +2412,10 @@ std::vector MetaClient::getRolesByUserFromCache(const std::strin if (!ready_) { return std::vector(0); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userRolesMap_.find(user); - if (iter == threadLocalInfo.userRolesMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userRolesMap_.find(user); + if (iter == metadata.userRolesMap_.end()) { return std::vector(0); } return iter->second; @@ -2413,14 +2426,12 @@ Status MetaClient::authCheckFromCache(const std::string& account, const std::str if (!ready_) { return Status::Error("Meta Service not ready"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - // Check user existence - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter == threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter == metadata.userPasswordMap_.end()) { return Status::Error("User not exist"); } - auto lockedSince = userLoginLockTime_[account]; auto passwordAttemtRemain = userPasswordAttemptsRemain_[account]; @@ -2481,18 +2492,20 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter != threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter != metadata.userPasswordMap_.end()) { return true; } return false; } StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); - if (spaceInfo == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceInfo = metadata.localCache_.find(spaceId); + if (spaceInfo == metadata.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2509,8 +2522,9 @@ StatusOr> MetaClient::getStorageHosts() { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.storageHosts_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2518,9 +2532,10 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2531,9 +2546,10 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -2987,9 +3003,10 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3006,8 +3023,9 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetListenersMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetListenersMap(host, metadata.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -3043,9 +3061,10 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3064,9 +3083,10 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3155,8 +3175,9 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (const auto& spaceEntry : threadLocalInfo.localCache_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (const auto& spaceEntry : metadata.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3443,11 +3464,11 @@ StatusOr> MetaClient::getServiceClientsFromCach if (!ready_) { return Status::Error("Not ready!"); } - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); if (type == cpp2::ExternalServiceType::ELASTICSEARCH) { - auto sIter = serviceClientList_.find(type); - if (sIter != serviceClientList_.end()) { + auto sIter = metadata.serviceClientList_.find(type); + if (sIter != metadata.serviceClientList_.end()) { return sIter->second; } } @@ -3509,8 +3530,9 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.fulltextIndexMap_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3518,9 +3540,10 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); std::unordered_map indexes; - for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + for (const auto& it : metadata.fulltextIndexMap_) { if (it.second.get_space_id() == spaceId) { indexes[it.first] = it.second; } @@ -3533,8 +3556,9 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (auto& it : threadLocalInfo.fulltextIndexMap_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (auto& it : metadata.fulltextIndexMap_) { auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type ? it.second.get_depend_schema().get_edge_type() : it.second.get_depend_schema().get_tag_id(); @@ -3550,12 +3574,13 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + if (metadata.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + metadata.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return threadLocalInfo.fulltextIndexMap_.at(name); + return metadata.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( @@ -3707,22 +3732,16 @@ bool MetaClient::loadSessions() { LOG(ERROR) << "List sessions failed, status:" << session_list.status(); return false; } - SessionMap* oldSessionMap = sessionMap_.load(); - SessionMap* newSessionMap = new SessionMap(*oldSessionMap); - auto oldKilledPlan = killedPlans_.load(); - auto newKilledPlan = new folly::F14FastSet>(*oldKilledPlan); + sessionMap_.clear(); + killedPlans_.clear(); for (auto& session : session_list.value().get_sessions()) { - (*newSessionMap)[session.get_session_id()] = session; + sessionMap_[session.get_session_id()] = session; for (auto& query : session.get_queries()) { if (query.second.get_status() == cpp2::QueryStatus::KILLING) { - newKilledPlan->insert({session.get_session_id(), query.first}); + killedPlans_.insert({session.get_session_id(), query.first}); } } } - sessionMap_.store(newSessionMap); - killedPlans_.store(newKilledPlan); - folly::rcu_retire(oldKilledPlan); - folly::rcu_retire(oldSessionMap); return true; } @@ -3731,9 +3750,9 @@ StatusOr MetaClient::getSessionFromCache(const nebula::SessionID& return Status::Error("Not ready!"); } folly::rcu_reader guard; - auto session_map = sessionMap_.load(); - auto it = session_map->find(session_id); - if (it != session_map->end()) { + auto& sessionMap = metadata_.load()->sessionMap_; + auto it = sessionMap.find(session_id); + if (it != sessionMap.end()) { return it->second; } return Status::SessionNotFound(); @@ -3747,7 +3766,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) return false; } folly::rcu_reader guard; - return killedPlans_.load()->count({sessionId, planId}); + return metadata_.load()->killedPlans_.count({sessionId, planId}); } Status MetaClient::verifyVersion() { diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index 22814b05fb0..aa4c00cdfb1 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -790,7 +790,8 @@ class MetaClient { // Only report dir info once when started bool dirInfoReported_ = false; - struct ThreadLocalInfo { + + struct MetaData { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; SpaceNameIdMap spaceIndexByName_; @@ -806,9 +807,12 @@ class MetaClient { std::vector storageHosts_; FTIndexMap fulltextIndexMap_; UserPasswordMap userPasswordMap_; - }; - const ThreadLocalInfo& getThreadLocalInfo(); + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + + ServiceClientsList serviceClientList_; + }; void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); @@ -837,7 +841,6 @@ class MetaClient { ServiceClientsList serviceClientList_; FTIndexMap fulltextIndexMap_; - mutable folly::RWSpinLock localCacheLock_; // The listener_ is the NebulaStore MetaChangedListener* listener_{nullptr}; // The lock used to protect listener_ @@ -855,8 +858,9 @@ class MetaClient { MetaClientOptions options_; std::vector storageHosts_; int64_t heartbeatTime_; - std::atomic sessionMap_; - std::atomic>*> killedPlans_; + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + std::atomic metadata_; }; } // namespace meta diff --git a/src/storage/test/ChainTestUtils.h b/src/storage/test/ChainTestUtils.h index d94f30b2a74..0fd04ca00ee 100644 --- a/src/storage/test/ChainTestUtils.h +++ b/src/storage/test/ChainTestUtils.h @@ -226,14 +226,14 @@ class MetaClientTestUpdater { static void addLocalCache(meta::MetaClient& mClient, GraphSpaceID spaceId, std::shared_ptr spInfoCache) { - mClient.localCache_[spaceId] = spInfoCache; + mClient.metadata_.load()->localCache_[spaceId] = spInfoCache; } static meta::SpaceInfoCache* getLocalCache(meta::MetaClient* mClient, GraphSpaceID spaceId) { - if (mClient->localCache_.count(spaceId) == 0) { + if (mClient->metadata_.load()->localCache_.count(spaceId) == 0) { return nullptr; } - return mClient->localCache_[spaceId].get(); + return mClient->metadata_.load()->localCache_[spaceId].get(); } static void addPartTerm(meta::MetaClient* mClient, diff --git a/src/storage/test/KillQueryTest.cpp b/src/storage/test/KillQueryTest.cpp index 815ddad74a5..8516244c5bb 100644 --- a/src/storage/test/KillQueryTest.cpp +++ b/src/storage/test/KillQueryTest.cpp @@ -19,7 +19,7 @@ class KillQueryMetaWrapper { public: explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {} void killQuery(SessionID session_id, ExecutionPlanID plan_id) { - client_->killedPlans_.load()->emplace(session_id, plan_id); + client_->metadata_.load()->killedPlans_.emplace(session_id, plan_id); } private: From 0a60c718a26fdd616cddc3a0b2a93e628dbe39b8 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 30 Dec 2021 19:18:40 +0800 Subject: [PATCH 2/3] fix bug --- src/clients/meta/MetaClient.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 9d6232c3730..21da524c4dd 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -377,7 +377,7 @@ bool MetaClient::loadData() { newMetaData->userPasswordMap_ = userPasswordMap_; newMetaData->sessionMap_ = std::move(sessionMap_); newMetaData->killedPlans_ = std::move(killedPlans_); - newMetaData->fulltextClientList_ = std::move(fulltextClientList_); + newMetaData->serviceClientList_ = std::move(serviceClientList_); auto oldMetaData = metadata_.load(); metadata_.store(newMetaData); folly::rcu_retire(oldMetaData); From 7081904ee8c85c045514532682b6f94114eb4b98 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 6 Jan 2022 15:42:32 +0800 Subject: [PATCH 3/3] fix bug --- src/clients/meta/MetaClient.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 21da524c4dd..7205324de3d 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -195,9 +195,11 @@ bool MetaClient::loadUsersAndRoles() { // Remove expired users from cache auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, const std::unordered_set& userList) { - for (auto& ele : userMap) { - if (userList.count(ele.first) == 0) { - userMap.erase(ele.first); + for (auto iter = userMap.begin(); iter != userMap.end();) { + if (!userList.count(iter->first)) { + iter = userMap.erase(iter); + } else { + ++iter; } } };