Skip to content

Commit

Permalink
address some comment
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs committed Dec 30, 2021
1 parent 8af8e1a commit 1f53617
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 39 deletions.
52 changes: 17 additions & 35 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool
: ioThreadPool_(ioThreadPool),
addrs_(std::move(addrs)),
options_(options),
sessionMap_(new SessionMap{}),
killedPlans_(new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>{}),
metadata_(new MetaData()) {
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
CHECK(!addrs_.empty())
Expand All @@ -67,8 +65,6 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool

MetaClient::~MetaClient() {
stop();
delete sessionMap_.load();
delete killedPlans_.load();
delete metadata_.load();
VLOG(3) << "~MetaClient";
}
Expand Down Expand Up @@ -165,11 +161,8 @@ bool MetaClient::loadUsersAndRoles() {
userRolesMap[user.first] = rolesRet.value();
userPasswordMap[user.first] = user.second;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
userRolesMap_ = std::move(userRolesMap);
userPasswordMap_ = std::move(userPasswordMap);
}
userRolesMap_ = std::move(userRolesMap);
userPasswordMap_ = std::move(userPasswordMap);
return true;
}

Expand Down Expand Up @@ -290,7 +283,6 @@ bool MetaClient::loadData() {

decltype(localCache_) oldCache;
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
oldCache = std::move(localCache_);
localCache_ = std::move(cache);
spaceIndexByName_ = std::move(spaceIndexByName);
Expand All @@ -307,7 +299,6 @@ bool MetaClient::loadData() {
localDataLastUpdateTime_.store(metadLastUpdateTime_.load());
auto newMetaData = new MetaData();

folly::RWSpinLock::ReadHolder holder(localCacheLock_);
for (auto& spaceInfo : localCache_) {
GraphSpaceID spaceId = spaceInfo.first;
std::shared_ptr<SpaceInfoCache> info = spaceInfo.second;
Expand All @@ -331,7 +322,9 @@ bool MetaClient::loadData() {
newMetaData->storageHosts_ = storageHosts_;
newMetaData->fulltextIndexMap_ = fulltextIndexMap_;
newMetaData->userPasswordMap_ = userPasswordMap_;

newMetaData->sessionMap_ = std::move(sessionMap_);
newMetaData->killedPlans_ = std::move(killedPlans_);
newMetaData->fulltextClientList_ = std::move(fulltextClientList_);
auto oldMetaData = metadata_.load();
metadata_.store(newMetaData);
folly::rcu_retire(oldMetaData);
Expand Down Expand Up @@ -557,10 +550,7 @@ bool MetaClient::loadFulltextClients() {
LOG(ERROR) << "List fulltext services failed, status:" << ftRet.status();
return false;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
fulltextClientList_ = std::move(ftRet).value();
}
fulltextClientList_ = std::move(ftRet).value();
return true;
}

Expand All @@ -570,10 +560,7 @@ bool MetaClient::loadFulltextIndexes() {
LOG(ERROR) << "List fulltext indexes failed, status:" << ftRet.status();
return false;
}
{
folly::RWSpinLock::WriteHolder holder(localCacheLock_);
fulltextIndexMap_ = std::move(ftRet).value();
}
fulltextIndexMap_ = std::move(ftRet).value();
return true;
}

Expand Down Expand Up @@ -3344,7 +3331,8 @@ StatusOr<std::vector<cpp2::FTClient>> MetaClient::getFTClientsFromCache() {
if (!ready_) {
return Status::Error("Not ready!");
}
return fulltextClientList_;
folly::rcu_reader guard;
return metadata_.load()->fulltextClientList_;
}

folly::Future<StatusOr<bool>> MetaClient::createFTIndex(const std::string& name,
Expand Down Expand Up @@ -3589,22 +3577,16 @@ bool MetaClient::loadSessions() {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
sessionMap_.clear();
killedPlans_.clear();
for (auto& session : session_list.value().get_sessions()) {
(*newSessionMap)[session.get_session_id()] = session;
sessionMap_[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
newKilledPlan->insert({session.get_session_id(), query.first});
killedPlans_.insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

Expand All @@ -3613,9 +3595,9 @@ StatusOr<cpp2::Session> MetaClient::getSessionFromCache(const nebula::SessionID&
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto session_map = sessionMap_.load();
auto it = session_map->find(session_id);
if (it != session_map->end()) {
auto& sessionMap = metadata_.load()->sessionMap_;
auto it = sessionMap.find(session_id);
if (it != sessionMap.end()) {
return it->second;
}
return Status::SessionNotFound();
Expand All @@ -3629,7 +3611,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId)
return false;
}
folly::rcu_reader guard;
return killedPlans_.load()->count({sessionId, planId});
return metadata_.load()->killedPlans_.count({sessionId, planId});
}

Status MetaClient::verifyVersion() {
Expand Down
11 changes: 8 additions & 3 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@ class MetaClient {
std::vector<HostAddr> storageHosts_;
FTIndexMap fulltextIndexMap_;
UserPasswordMap userPasswordMap_;

SessionMap sessionMap_;
folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>> killedPlans_;

FulltextClientsList fulltextClientList_;
};

void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool);
Expand All @@ -813,7 +818,7 @@ class MetaClient {
FulltextClientsList fulltextClientList_;
FTIndexMap fulltextIndexMap_;

mutable folly::RWSpinLock localCacheLock_;
// mutable folly::RWSpinLock localCacheLock_;
// The listener_ is the NebulaStore
MetaChangedListener* listener_{nullptr};
// The lock used to protect listener_
Expand All @@ -831,8 +836,8 @@ class MetaClient {
MetaClientOptions options_;
std::vector<HostAddr> storageHosts_;
int64_t heartbeatTime_;
std::atomic<SessionMap*> sessionMap_;
std::atomic<folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>*> killedPlans_;
SessionMap sessionMap_;
folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>> killedPlans_;
std::atomic<MetaData*> metadata_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/KillQueryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class KillQueryMetaWrapper {
public:
explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {}
void killQuery(SessionID session_id, ExecutionPlanID plan_id) {
client_->killedPlans_.load()->emplace(session_id, plan_id);
client_->metadata_.load()->killedPlans_.emplace(session_id, plan_id);
}

private:
Expand Down

0 comments on commit 1f53617

Please sign in to comment.