Skip to content

Commit

Permalink
improve br, including:
Browse files Browse the repository at this point in the history
1. cancel block when  backup meta table failed
2. list cluster info only return alive info, and remove redundant alive agents
3. add machine replacing when store
4. add batch write when restore meta
5. improve restore test
  • Loading branch information
pengweisong committed Mar 24, 2022
1 parent e75be4e commit ccd2bf0
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 232 deletions.
16 changes: 12 additions & 4 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/meta/SchemaManager.h"
#include "kvstore/Common.h"
#include "kvstore/CompactionFilter.h"
#include "kvstore/KVEngine.h"
#include "kvstore/KVIterator.h"
#include "kvstore/PartManager.h"
#include "kvstore/raftex/RaftPart.h"
Expand Down Expand Up @@ -429,14 +430,21 @@ class KVStore {
const std::vector<std::string>& files) = 0;

/**
* @brief Write data to local storage engine only
* @brief return a WriteBatch object to do batch operation
*
* @return std::unique_ptr<WriteBatch>
*/
virtual std::unique_ptr<WriteBatch> startBatchWrite() = 0;

/**
* @brief Write batch data to local storage only
*
* @param spaceId
* @param keyValues Key/values to write into only local storage engine instead of multiple replica
* @param batch
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) = 0;
virtual nebula::cpp2::ErrorCode batchWriteWithoutReplicator(
GraphSpaceID spaceId, std::unique_ptr<WriteBatch> batch) = 0;

/**
* @brief Get the data paths
Expand Down
15 changes: 8 additions & 7 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,6 @@ nebula::cpp2::ErrorCode NebulaStore::restoreFromFiles(GraphSpaceID spaceId,
}
auto space = nebula::value(spaceRet);

DCHECK_EQ(space->engines_.size(), 1);

for (auto& engine : space->engines_) {
auto ret = engine->ingest(files, true);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -1232,19 +1230,22 @@ nebula::cpp2::ErrorCode NebulaStore::restoreFromFiles(GraphSpaceID spaceId,
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::cpp2::ErrorCode NebulaStore::multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) {
std::unique_ptr<WriteBatch> NebulaStore::startBatchWrite() {
return std::make_unique<RocksWriteBatch>();
}

nebula::cpp2::ErrorCode NebulaStore::batchWriteWithoutReplicator(
GraphSpaceID spaceId, std::unique_ptr<WriteBatch> batch) {
auto spaceRet = space(spaceId);
if (!ok(spaceRet)) {
LOG(WARNING) << "Get Space " << spaceId << " Failed";
return error(spaceRet);
}
auto space = nebula::value(spaceRet);

DCHECK_EQ(space->engines_.size(), 1);

for (auto& engine : space->engines_) {
auto ret = engine->multiPut(keyValues);
auto ret = engine->commitBatchWrite(
std::move(batch), FLAGS_rocksdb_disable_wal, FLAGS_rocksdb_wal_sync, true);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
Expand Down
15 changes: 11 additions & 4 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,14 +661,21 @@ class NebulaStore : public KVStore, public Handler {
void fetchDiskParts(SpaceDiskPartsMap& diskParts) override;

/**
* @brief Write data to local storage engine only
* @brief return a WriteBatch object to do batch operation
*
* @return std::unique_ptr<WriteBatch>
*/
std::unique_ptr<WriteBatch> startBatchWrite() override;

/**
* @brief Write batch to local storage engine only
*
* @param spaceId
* @param keyValues Key/values to write into only local storage engine instead of multiple replica
* @param batch
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode multiPutWithoutReplicator(GraphSpaceID spaceId,
std::vector<KV> keyValues) override;
nebula::cpp2::ErrorCode batchWriteWithoutReplicator(GraphSpaceID spaceId,
std::unique_ptr<WriteBatch> batch) override;

/**
* @brief Get the kvstore propery, only used in rocksdb
Expand Down
50 changes: 1 addition & 49 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,9 @@ namespace kvstore {
using fs::FileType;
using fs::FileUtils;

namespace {

/***************************************
*
* Implementation of WriteBatch
*
**************************************/
class RocksWriteBatch : public WriteBatch {
private:
rocksdb::WriteBatch batch_;

public:
RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {}

virtual ~RocksWriteBatch() = default;

nebula::cpp2::ErrorCode put(folly::StringPiece key, folly::StringPiece value) override {
if (batch_.Put(toSlice(key), toSlice(value)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

nebula::cpp2::ErrorCode remove(folly::StringPiece key) override {
if (batch_.Delete(toSlice(key)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

// Remove all keys in the range [start, end)
nebula::cpp2::ErrorCode removeRange(folly::StringPiece start, folly::StringPiece end) override {
if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

rocksdb::WriteBatch* data() {
return &batch_;
}
};

} // Anonymous namespace

/***************************************
*
* Implementation of WriteBatch
* Implementation of RocksEngine
*
**************************************/
RocksEngine::RocksEngine(GraphSpaceID spaceId,
Expand Down
43 changes: 43 additions & 0 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,49 @@ class RocksCommonIter : public KVIterator {
std::unique_ptr<rocksdb::Iterator> iter_;
};

/***************************************
*
* Implementation of WriteBatch
*
**************************************/
class RocksWriteBatch : public WriteBatch {
private:
rocksdb::WriteBatch batch_;

public:
RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {}

virtual ~RocksWriteBatch() = default;

nebula::cpp2::ErrorCode put(folly::StringPiece key, folly::StringPiece value) override {
if (batch_.Put(toSlice(key), toSlice(value)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

nebula::cpp2::ErrorCode remove(folly::StringPiece key) override {
if (batch_.Delete(toSlice(key)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

// Remove all keys in the range [start, end)
nebula::cpp2::ErrorCode removeRange(folly::StringPiece start, folly::StringPiece end) override {
if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
} else {
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
}

rocksdb::WriteBatch* data() {
return &batch_;
}
};
/**
* @brief An implementation of KVEngine based on Rocksdb
*
Expand Down
13 changes: 13 additions & 0 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ ActiveHostsMan::getServicesInHost(kvstore::KVStore* kv, const std::string& hostn
auto addr = MetaKeyUtils::parseHostKey(iter->key());
HostInfo info = HostInfo::decode(iter->val());

// skip the service not alive
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph
if (info.role_ == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor;
}
int64_t threshold = expiredTime * 1000;
auto now = time::WallClock::fastNowInMilliSec();
if (now - info.lastHBTimeInMilliSec_ >= threshold) {
iter->next();
continue;
}

if (addr.host == hostname) {
hosts.emplace_back(addr, info.role_);
}
Expand Down
7 changes: 5 additions & 2 deletions src/meta/processors/admin/AgentHBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,18 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {

// join the service host info and dir info
auto services = std::move(nebula::value(servicesRet));
size_t agentCnt = 0;
std::vector<cpp2::ServiceInfo> serviceList;
for (const auto& [addr, role] : services) {
if (addr == agentAddr) {
// skip iteself
agentCnt++;
continue;
}

if (role == cpp2::HostRole::AGENT) {
LOG(INFO) << folly::sformat("there is another agent: {} in the host", addr.toString());
agentCnt++;
continue;
}

Expand All @@ -98,12 +101,12 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
serviceInfo.role_ref() = role;
serviceList.emplace_back(serviceInfo);
}
if (serviceList.size() != services.size() - 1) {
if (serviceList.size() != services.size() - agentCnt) {
ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE;
// missing some services' dir info
LOG(INFO) << folly::sformat(
"Missing some services's dir info, excepted service {}, but only got {}",
services.size() - 1,
services.size() - agentCnt,
serviceList.size());
break;
}
Expand Down
8 changes: 6 additions & 2 deletions src/meta/processors/admin/CreateBackupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
handleErrorCode(ret);
ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Cancel write blocking error";
LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret);
}
onFinished();
return;
Expand All @@ -159,7 +159,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
handleErrorCode(nebula::error(sret));
ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Cancel write blocking error";
LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret);
}
onFinished();
return;
Expand All @@ -170,6 +170,10 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
if (!nebula::ok(backupFiles)) {
LOG(INFO) << "Failed backup meta";
handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_FAILED);
ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Cancel write blocking error:" << apache::thrift::util::enumNameSafe(ret);
}
onFinished();
return;
}
Expand Down
50 changes: 45 additions & 5 deletions src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

#include "meta/processors/admin/ListClusterInfoProcessor.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_int32(agent_heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -40,6 +44,18 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) {
auto addr = MetaKeyUtils::parseHostKey(iter->key());
auto info = HostInfo::decode(iter->val());

if (addr.host == "") {
LOG(INFO) << folly::sformat("Bad format host:{}, skip it", addr.toString());
continue;
}

if (!isAlive(info)) {
LOG(INFO) << folly::sformat("{}:{} is not alive, will skip it",
apache::thrift::util::enumNameSafe(info.role_),
addr.toString());
continue;
}

cpp2::ServiceInfo service;
service.role_ref() = info.role_;
service.addr_ref() = addr;
Expand Down Expand Up @@ -89,16 +105,27 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) {
hostServices[metaAddr.host].push_back(service);
}

// check: there should be only one agent in each host
for (const auto& [host, services] : hostServices) {
// Check there is at least one agent in each host. If there are many ones, we will pick the first.
std::unordered_map<std::string, std::vector<cpp2::ServiceInfo>> agentServices;
for (auto& [host, services] : hostServices) {
int agentCount = 0;
for (auto& s : services) {
if (s.get_role() == cpp2::HostRole::AGENT) {
for (auto it = services.begin(); it != services.end();) {
if (it->get_role() == cpp2::HostRole::AGENT) {
agentCount++;

if (agentCount > 1) {
LOG(INFO) << folly::sformat(
"Will erase redundant agent {} from host {}", it->get_addr().toString(), host);
it = services.erase(it);
continue;
}
}

it++;
}

if (agentCount < 1) {
LOG(INFO) << folly::sformat("There are {} agent count is host {}", agentCount, host);
LOG(INFO) << folly::sformat("There is no agent in host {}", host);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE);
onFinished();
return;
Expand All @@ -114,5 +141,18 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq&) {
resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED;
onFinished();
}

bool ListClusterInfoProcessor::isAlive(const HostInfo& info) {
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph
if (info.role_ == cpp2::HostRole::AGENT) { // agent
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor;
}
int64_t threshold = expiredTime * 1000;
auto now = time::WallClock::fastNowInMilliSec();

return now - info.lastHBTimeInMilliSec_ < threshold;
}

} // namespace meta
} // namespace nebula
2 changes: 2 additions & 0 deletions src/meta/processors/admin/ListClusterInfoProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class ListClusterInfoProcessor : public BaseProcessor<cpp2::ListClusterInfoResp>
private:
explicit ListClusterInfoProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ListClusterInfoResp>(kvstore) {}

bool isAlive(const HostInfo& info);
};
} // namespace meta
} // namespace nebula
Expand Down
Loading

0 comments on commit ccd2bf0

Please sign in to comment.