Skip to content

Commit

Permalink
refactor br related code (vesoft-inc#400)
Browse files Browse the repository at this point in the history
#### What type of PR is this?
- [ ] bug
- [ ] feature
- [x] enhancement

#### What does this PR do?
The original br tools require password-free ssh to each machine which add extra work for users.
This pr's main goal is to remove ssh dependency. To do this, we introduce an agent in each machine.
Then br can handle machines' data through agent's service.
Agent communicate with the meta service through heartbeat. By heartbeat, agent register itself
to meta service and pull the services it should supervise in its host.

The agent: vesoft-inc/nebula-agent#1
The br: vesoft-inc/nebula-br#22

This pr includes:
1. refactor br related code, including renaming and adjust code structure.
2. batch the snapshot rpc by spaces
3. add agent heartbeat
4. report data/root path in the storaged/graphd heartbeat

#### Which issue(s)/PR(s) this PR relates to?

The agent: vesoft-inc/nebula-agent#1
The br: vesoft-inc/nebula-br#22
  
#### Special notes for your reviewer, ex. impact of this fix, etc:


#### Additional context:


#### Checklist:
- [ ] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatible (If it is incompatible, please describe it and add corresponding label.)
- [ ] Need to cherry-pick (If need to cherry-pick to some branches, please label the destination version(s).)
- [ ] Performance impacted: Consumes more CPU/Memory


#### Release notes:
Please confirm whether to reflect in release notes and how to describe:
>                                                                 `


Migrated from vesoft-inc#3469

Co-authored-by: pengwei.song <[email protected]>
  • Loading branch information
nebula-bots and pengweisong authored Dec 29, 2021
1 parent 99c4bb3 commit 4a58503
Show file tree
Hide file tree
Showing 57 changed files with 1,031 additions and 660 deletions.
25 changes: 24 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <folly/hash/Hash.h>
#include <thrift/lib/cpp/util/EnumUtils.h>

#include <boost/filesystem.hpp>

#include "clients/meta/FileBasedClusterIdMan.h"
#include "clients/meta/stats/MetaClientStats.h"
#include "common/base/Base.h"
Expand Down Expand Up @@ -56,6 +58,8 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool
updateLeader();
bgThread_ = std::make_unique<thread::GenericWorker>();
LOG(INFO) << "Create meta client to " << active_;
LOG(INFO) << folly::sformat(
"root path: {}, data path size: {}", options_.rootPath_, options_.dataPaths_.size());
}

MetaClient::~MetaClient() {
Expand Down Expand Up @@ -2523,6 +2527,20 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
}
}

// info used in the agent, only set once
// TOOD(spw): if we could add data path(disk) dynamicly in the future, it should be
// reported every time it changes
if (!dirInfoReported_) {
nebula::cpp2::DirInfo dirInfo;
if (options_.role_ == cpp2::HostRole::GRAPH) {
dirInfo.set_root(options_.rootPath_);
} else if (options_.role_ == cpp2::HostRole::STORAGE) {
dirInfo.set_root(options_.rootPath_);
dirInfo.set_data(options_.dataPaths_);
}
req.set_dir(dirInfo);
}

folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
VLOG(1) << "Send heartbeat to " << leader_ << ", clusterId " << req.get_cluster_id();
Expand All @@ -2545,7 +2563,12 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
metadLastUpdateTime_ = resp.get_last_update_time_in_ms();
VLOG(1) << "Metad last update time: " << metadLastUpdateTime_;
metaServerVersion_ = resp.get_meta_version();
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;

bool succeeded = resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
if (succeeded) {
dirInfoReported_ = true;
}
return succeeded;
},
std::move(promise),
true);
Expand Down
10 changes: 9 additions & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ struct MetaClientOptions {
serviceName_(opt.serviceName_),
skipConfig_(opt.skipConfig_),
role_(opt.role_),
gitInfoSHA_(opt.gitInfoSHA_) {}
gitInfoSHA_(opt.gitInfoSHA_),
dataPaths_(opt.dataPaths_),
rootPath_(opt.rootPath_) {}

// Current host address
HostAddr localHost_{"", 0};
Expand All @@ -221,6 +223,10 @@ struct MetaClientOptions {
cpp2::HostRole role_ = cpp2::HostRole::UNKNOWN;
// gitInfoSHA of Host using this client
std::string gitInfoSHA_{""};
// data path list, used in storaged
std::vector<std::string> dataPaths_;
// install path, used in metad/graphd/storaged
std::string rootPath_;
};

class MetaClient {
Expand Down Expand Up @@ -845,6 +851,8 @@ class MetaClient {
HostAddr leader_;
HostAddr localHost_;

// Only report dir info once when started
bool dirInfoReported_ = false;
struct ThreadLocalInfo {
int64_t localLastUpdateTime_{-2};
LocalCache localCache_;
Expand Down
1 change: 1 addition & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
X(E_GET_META_DIR_FAILURE, -2072) \
\
X(E_QUERY_NOT_FOUND, -2073) \
X(E_AGENT_HB_FAILUE, -2074) \
/* 3xxx for storaged */ \
X(E_CONSENSUS_ERROR, -3001) \
X(E_KEY_HAS_EXISTS, -3002) \
Expand Down
49 changes: 48 additions & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"machines", {"__machines__", false}},
{"host_dirs", {"__host_dirs__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
{"groups", {"__groups__", true}},
Expand Down Expand Up @@ -66,6 +67,7 @@ static const std::string kPartsTable = tableMaps.at("parts").first;
static const std::string kVersionsTable = systemTableMaps.at("versions").first; // NOLINT
static const std::string kHostsTable = systemTableMaps.at("hosts").first; // NOLINT
static const std::string kMachinesTable = systemTableMaps.at("machines").first; // NOLINT
static const std::string kHostDirsTable = systemTableMaps.at("host_dirs").first;// NOLINT
static const std::string kTagsTable = tableMaps.at("tags").first; // NOLINT
static const std::string kEdgesTable = tableMaps.at("edges").first; // NOLINT
static const std::string kIndexesTable = tableMaps.at("indexes").first; // NOLINT
Expand Down Expand Up @@ -111,7 +113,7 @@ static const std::unordered_map<std::string, std::pair<std::string, std::string>

const int kMaxIpAddrLen = 15; // '255.255.255.255'

std::string MetaKeyUtils::getIndexTable() { return tableMaps.at("index").first; }
std::string MetaKeyUtils::getIndexTable() { return kIndexTable; }

std::unordered_map<std::string,
std::pair<std::string, std::function<decltype(MetaKeyUtils::spaceId)>>>
Expand Down Expand Up @@ -270,6 +272,41 @@ HostAddr MetaKeyUtils::parseMachineKey(folly::StringPiece key) {
return MetaKeyUtils::deserializeHostAddr(key);
}

const std::string& MetaKeyUtils::hostDirPrefix() { return kHostDirsTable; }

const std::string MetaKeyUtils::hostDirHostPrefix(std::string host) {
return kHostDirsTable + host;
}

std::string MetaKeyUtils::hostDirKey(std::string host, Port port) {
std::string key;
key.reserve(kHostDirsTable.size() + host.size() + sizeof(port));
key.append(kHostDirsTable.data(), kHostDirsTable.size()).append(host);
key.append(reinterpret_cast<const char*>(&port), sizeof(Port));
return key;
}

HostAddr MetaKeyUtils::parseHostDirKey(folly::StringPiece key) {
HostAddr addr;
auto hostSize = key.size() - kHostDirsTable.size() - sizeof(Port);
addr.host = key.subpiece(kHostDirsTable.size(), hostSize).toString();
key.advance(kHostDirsTable.size() + hostSize);
addr.port = *reinterpret_cast<const Port*>(key.begin());
return addr;
}

std::string MetaKeyUtils::hostDirVal(cpp2::DirInfo dir) {
std::string val;
apache::thrift::CompactSerializer::serialize(dir, &val);
return val;
}

cpp2::DirInfo MetaKeyUtils::parseHostDir(folly::StringPiece val) {
cpp2::DirInfo dir;
apache::thrift::CompactSerializer::deserialize(val, dir);
return dir;
}

std::string MetaKeyUtils::hostKey(std::string addr, Port port) { return hostKeyV2(addr, port); }

std::string MetaKeyUtils::hostKeyV2(std::string addr, Port port) {
Expand Down Expand Up @@ -672,6 +709,16 @@ std::string MetaKeyUtils::indexSpaceKey(const std::string& name) {
return key;
}

std::string MetaKeyUtils::parseIndexSpaceKey(folly::StringPiece key) {
auto nameSize = key.size() - kIndexTable.size() - sizeof(EntryType);
return key.subpiece(kIndexTable.size() + sizeof(EntryType), nameSize).str();
}

EntryType MetaKeyUtils::parseIndexType(folly::StringPiece key) {
auto type = *reinterpret_cast<const EntryType*>(key.data() + kIndexTable.size());
return type;
}

std::string MetaKeyUtils::indexTagKey(GraphSpaceID spaceId, const std::string& name) {
EntryType type = EntryType::TAG;
std::string key;
Expand Down
24 changes: 22 additions & 2 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,25 @@ class MetaKeyUtils final {

static HostAddr parseMachineKey(folly::StringPiece key);

static std::string hostKey(std::string ip, Port port);
// hostDir store service(metad/storaged/graphd) address -> dir info(root path and data paths)
// agent will use these to start/stop service and backup/restore data
static std::string hostDirKey(std::string ip);

static std::string hostKeyV2(std::string addr, Port port);
static std::string hostDirKey(std::string host, Port port);

static HostAddr parseHostDirKey(folly::StringPiece key);

static const std::string& hostDirPrefix();

static const std::string hostDirHostPrefix(std::string host);

static std::string hostDirVal(cpp2::DirInfo dir);

static cpp2::DirInfo parseHostDir(folly::StringPiece val);

static std::string hostKey(std::string host, Port port);

static std::string hostKeyV2(std::string host, Port port);

static const std::string& hostPrefix();

Expand Down Expand Up @@ -213,6 +229,10 @@ class MetaKeyUtils final {

static std::string indexSpaceKey(const std::string& name);

static std::string parseIndexSpaceKey(folly::StringPiece key);

static EntryType parseIndexType(folly::StringPiece key);

static std::string indexTagKey(GraphSpaceID spaceId, const std::string& name);

// drainer
Expand Down
4 changes: 3 additions & 1 deletion src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/encryption/License.h"
#include "common/fs/FileUtils.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/hdfs/HdfsHelper.h"
#include "common/meta/ServerBasedSchemaManager.h"
Expand Down Expand Up @@ -160,7 +161,8 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
}

kvOptions.isMeta_ = true;
kvOptions.dataPaths_ = {FLAGS_data_path};
auto absolute = boost::filesystem::absolute(FLAGS_data_path);
kvOptions.dataPaths_ = {absolute.string()};
kvOptions.listenerPath_ = FLAGS_listener_path;
kvOptions.partMan_ = std::move(partMan);

Expand Down
12 changes: 9 additions & 3 deletions src/daemons/StorageDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
#include "common/network/NetworkUtils.h"
#include "common/process/ProcessUtils.h"
#include "common/time/TimezoneInfo.h"
Expand Down Expand Up @@ -136,9 +137,14 @@ int main(int argc, char *argv[]) {

std::vector<std::string> paths;
folly::split(",", FLAGS_data_path, paths, true);
std::transform(paths.begin(), paths.end(), paths.begin(), [](auto &p) {
return folly::trimWhitespace(p).str();
});
// make the paths absolute
std::transform(
paths.begin(), paths.end(), paths.begin(), [](const std::string &p) -> std::string {
auto path = folly::trimWhitespace(p).str();
path = boost::filesystem::absolute(path).string();
LOG(INFO) << "data path= " << path;
return path;
});
if (paths.empty()) {
LOG(ERROR) << "Bad data_path format:" << FLAGS_data_path;
return EXIT_FAILURE;
Expand Down
3 changes: 3 additions & 0 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "graph/service/GraphService.h"

#include <boost/filesystem.hpp>

#include "clients/storage/StorageClient.h"
#include "common/base/Base.h"
#include "common/encryption/MD5Utils.h"
Expand Down Expand Up @@ -35,6 +37,7 @@ Status GraphService::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecuto
options.role_ = meta::cpp2::HostRole::GRAPH;
options.localHost_ = hostAddr;
options.gitInfoSHA_ = gitInfoSha();
options.rootPath_ = boost::filesystem::current_path().string();

metaClient_ = std::make_unique<meta::MetaClient>(ioExecutor, std::move(addrs.value()), options);

Expand Down
15 changes: 4 additions & 11 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,11 @@ struct DirInfo {
2: list<binary> data,
}

struct NodeInfo {
1: HostAddr host,
2: DirInfo dir,
}

struct PartitionBackupInfo {
1: map<PartitionID, LogInfo> (cpp.template = "std::unordered_map") info,
}

struct CheckpointInfo {
1: PartitionBackupInfo partition_info,
1: GraphSpaceID space_id,
2: map<PartitionID, LogInfo> (cpp.template = "std::unordered_map") parts,
// storage checkpoint directory name
2: binary path,
3: binary path,
}

// used for raft and drainer
Expand Down Expand Up @@ -414,6 +406,7 @@ enum ErrorCode {
E_GET_META_DIR_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
E_AGENT_HB_FAILUE = -2074,

E_INVALID_VARIABLE = -2080,
E_VARIABLE_TYPE_VALUE_MISMATCH = -2081,
Expand Down
Loading

0 comments on commit 4a58503

Please sign in to comment.