Skip to content

Commit

Permalink
[fix] Change leader set logic
Browse files Browse the repository at this point in the history
  • Loading branch information
reindexer-bot committed Jan 19, 2024
1 parent 652d165 commit 919a4e3
Show file tree
Hide file tree
Showing 105 changed files with 2,124 additions and 1,109 deletions.
2 changes: 1 addition & 1 deletion cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ if (NOT WIN32)
SET(CMAKE_INSTALL_DEFAULT_COMPONENT_NAME "server")
SET(DIST_INCLUDE_FILES
"tools/errors.h" "tools/serializer.h" "tools/varint.h" "tools/stringstools.h" "tools/customhash.h" "tools/assertrx.h" "tools/jsonstring.h"
"tools/verifying_updater.h"
"tools/verifying_updater.h" "tools/customlocal.h"
"core/reindexer.h" "core/type_consts.h" "core/item.h" "core/payload/payloadvalue.h" "core/payload/payloadiface.h" "core/indexopts.h"
"core/namespacedef.h" "core/keyvalue/variant.h" "core/keyvalue/geometry.h" "core/sortingprioritiestable.h"
"core/rdxcontext.h" "core/activity_context.h" "core/activity.h" "core/activitylog.h" "core/type_consts_helpers.h" "core/payload/fieldsset.h" "core/payload/payloadtype.h"
Expand Down
32 changes: 21 additions & 11 deletions cpp_src/client/coroqueryresults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,21 @@ void CoroQueryResults::Bind(std::string_view rawResult, RPCQrId id, const Query
PayloadType("tmp").clone()->deserialize(ser);
},
opts, i_.parsingData_);

const auto copyStart = i_.lazyMode_ ? rawResult.begin() : (rawResult.begin() + ser.Pos());
const auto rawResLen = std::distance(copyStart, rawResult.end());
if rx_unlikely (rawResLen > QrRawBuffer::max_size()) {
throw Error(
errLogic,
"client::QueryResults::Bind: rawResult buffer overflow. Max size if %d bytes, but %d bytes requested. Try to reduce "
"fetch limit (current limit is %d)",
QrRawBuffer::max_size(), rawResLen, i_.fetchAmount_);
}

i_.rawResult_.assign(copyStart, rawResult.end());
} catch (const Error &err) {
i_.status_ = err;
}

if (i_.lazyMode_) {
i_.rawResult_.assign(rawResult.begin(), rawResult.end());
} else {
i_.rawResult_.assign(rawResult.begin() + ser.Pos(), rawResult.end());
}
}

void CoroQueryResults::fetchNextResults() {
Expand All @@ -120,11 +126,15 @@ void CoroQueryResults::handleFetchedBuf(net::cproto::CoroRPCAnswer &ans) {
ResultSerializer ser(rawResult);

ser.GetRawQueryParams(i_.queryParams_, nullptr, ResultSerializer::Options{}, i_.parsingData_);
if (i_.lazyMode_) {
i_.rawResult_.assign(rawResult.begin(), rawResult.end());
} else {
i_.rawResult_.assign(rawResult.begin() + ser.Pos(), rawResult.end());
}
const auto copyStart = i_.lazyMode_ ? rawResult.begin() : (rawResult.begin() + ser.Pos());
const auto rawResLen = std::distance(copyStart, rawResult.end());
if rx_unlikely (rawResLen > QrRawBuffer::max_size()) {
throw Error(errLogic,
"client::QueryResults::fetchNextResults: rawResult buffer overflow. Max size if %d bytes, but %d bytes requested. Try "
"to reduce fetch limit (current limit is %d)",
QrRawBuffer::max_size(), rawResLen, i_.fetchAmount_);
}
i_.rawResult_.assign(copyStart, rawResult.end());
i_.status_ = Error();
}

Expand Down
6 changes: 3 additions & 3 deletions cpp_src/client/itemimplbase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void ItemImplBase::FromCJSON(std::string_view slice) {
const auto tupleSize = ser_.Len();
tupleHolder_ = ser_.DetachBuf();
tupleData_ = std::string_view(reinterpret_cast<char *>(tupleHolder_.get()), tupleSize);
pl.Set(0, Variant(p_string(&tupleData_)));
pl.Set(0, Variant(p_string(&tupleData_), Variant::no_hold_t{}));
}

Error ItemImplBase::FromJSON(std::string_view slice, char **endp, bool /*pkOnly*/) {
Expand Down Expand Up @@ -95,7 +95,7 @@ Error ItemImplBase::FromJSON(std::string_view slice, char **endp, bool /*pkOnly*
const auto tupleSize = ser_.Len();
tupleHolder_ = ser_.DetachBuf();
tupleData_ = std::string_view(reinterpret_cast<char *>(tupleHolder_.get()), tupleSize);
pl.Set(0, Variant(p_string(&tupleData_)));
pl.Set(0, Variant(p_string(&tupleData_), Variant::no_hold_t{}));
}
return err;
}
Expand All @@ -116,7 +116,7 @@ Error ItemImplBase::FromMsgPack(std::string_view buf, size_t &offset) {
const auto tupleSize = ser_.Len();
tupleHolder_ = ser_.DetachBuf();
tupleData_ = std::string_view(reinterpret_cast<char *>(tupleHolder_.get()), tupleSize);
pl.Set(0, Variant(p_string(&tupleData_)));
pl.Set(0, Variant(p_string(&tupleData_), Variant::no_hold_t{}));
}
return err;
}
Expand Down
4 changes: 2 additions & 2 deletions cpp_src/client/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ class Namespace {
template <typename ClientT>
Item NewItem(ClientT& client, std::chrono::milliseconds execTimeout);
void TryReplaceTagsMatcher(TagsMatcher&& tm, bool checkVersion = true);
TagsMatcher GetTagsMatcher() const {
TagsMatcher GetTagsMatcher() const noexcept {
shared_lock<shared_timed_mutex> lk(lck_);
return tagsMatcher_;
}
int GetStateToken() const {
int GetStateToken() const noexcept {
shared_lock<shared_timed_mutex> lk(lck_);
return int(tagsMatcher_.stateToken());
}
Expand Down
4 changes: 2 additions & 2 deletions cpp_src/client/queryresults.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ QueryResults::~QueryResults() {
results_.setClosed();
}

TagsMatcher QueryResults::GetTagsMatcher(int nsid) const { return results_.GetTagsMatcher(nsid); }
TagsMatcher QueryResults::GetTagsMatcher(std::string_view ns) const { return results_.GetTagsMatcher(ns); }
TagsMatcher QueryResults::GetTagsMatcher(int nsid) const noexcept { return results_.GetTagsMatcher(nsid); }
TagsMatcher QueryResults::GetTagsMatcher(std::string_view ns) const noexcept { return results_.GetTagsMatcher(ns); }

} // namespace client
} // namespace reindexer
8 changes: 4 additions & 4 deletions cpp_src/client/queryresults.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class QueryResults {
bool IsCacheEnabled() const noexcept { return results_.IsCacheEnabled(); }

int GetMergedNSCount() const noexcept { return results_.GetMergedNSCount(); }
TagsMatcher GetTagsMatcher(int nsid) const;
TagsMatcher GetTagsMatcher(std::string_view ns) const;
PayloadType GetPayloadType(int nsid) const { return results_.GetPayloadType(nsid); }
PayloadType GetPayloadType(std::string_view ns) const { return results_.GetPayloadType(ns); }
TagsMatcher GetTagsMatcher(int nsid) const noexcept;
TagsMatcher GetTagsMatcher(std::string_view ns) const noexcept;
PayloadType GetPayloadType(int nsid) const noexcept { return results_.GetPayloadType(nsid); }
PayloadType GetPayloadType(std::string_view ns) const noexcept { return results_.GetPayloadType(ns); }

int GetFormat() const noexcept { return results_.GetFormat(); }
int GetFlags() const noexcept { return results_.GetFlags(); }
Expand Down
9 changes: 7 additions & 2 deletions cpp_src/cluster/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ Error ShardingConfig::FromYAML(const std::string &yaml) {
thisShardId = root["this_shard_id"].as<int>();
reconnectTimeout = std::chrono::milliseconds(root["reconnect_timeout_msec"].as<int>(reconnectTimeout.count()));
shardsAwaitingTimeout = std::chrono::seconds(root["shards_awaiting_timeout_sec"].as<int>(shardsAwaitingTimeout.count()));
configRollbackTimeout = std::chrono::seconds(root["config_rollback_timeout_sec"].as<int>(configRollbackTimeout.count()));
proxyConnCount = root["proxy_conn_count"].as<int>(proxyConnCount);
proxyConnConcurrency = root["proxy_conn_concurrency"].as<int>(proxyConnConcurrency);
proxyConnThreads = root["proxy_conn_threads"].as<int>(proxyConnThreads);
Expand Down Expand Up @@ -922,6 +923,7 @@ Error ShardingConfig::FromJSON(const gason::JsonNode &root) {
thisShardId = root["this_shard_id"].As<int>();
reconnectTimeout = std::chrono::milliseconds(root["reconnect_timeout_msec"].As<int>(reconnectTimeout.count()));
shardsAwaitingTimeout = std::chrono::seconds(root["shards_awaiting_timeout_sec"].As<int>(shardsAwaitingTimeout.count()));
configRollbackTimeout = std::chrono::seconds(root["config_rollback_timeout_sec"].As<int>(configRollbackTimeout.count()));
proxyConnCount = root["proxy_conn_count"].As<int>(proxyConnCount);
proxyConnConcurrency = root["proxy_conn_concurrency"].As<int>(proxyConnConcurrency);
proxyConnThreads = root["proxy_conn_threads"].As<int>(proxyConnThreads);
Expand All @@ -937,8 +939,9 @@ Error ShardingConfig::FromJSON(const gason::JsonNode &root) {
bool operator==(const ShardingConfig &lhs, const ShardingConfig &rhs) {
return lhs.namespaces == rhs.namespaces && lhs.thisShardId == rhs.thisShardId && lhs.shards == rhs.shards &&
lhs.reconnectTimeout == rhs.reconnectTimeout && lhs.shardsAwaitingTimeout == rhs.shardsAwaitingTimeout &&
lhs.proxyConnCount == rhs.proxyConnCount && lhs.proxyConnConcurrency == rhs.proxyConnConcurrency &&
rhs.proxyConnThreads == lhs.proxyConnThreads && rhs.sourceId == lhs.sourceId;
lhs.configRollbackTimeout == rhs.configRollbackTimeout && lhs.proxyConnCount == rhs.proxyConnCount &&
lhs.proxyConnConcurrency == rhs.proxyConnConcurrency && rhs.proxyConnThreads == lhs.proxyConnThreads &&
rhs.sourceId == lhs.sourceId;
}
bool operator==(const ShardingConfig::Key &lhs, const ShardingConfig::Key &rhs) {
return lhs.shardId == rhs.shardId && lhs.algorithmType == rhs.algorithmType && lhs.RelaxCompare(rhs.values) == 0;
Expand Down Expand Up @@ -972,6 +975,7 @@ YAML::Node ShardingConfig::GetYAMLObj() const {
yaml["this_shard_id"] = thisShardId;
yaml["reconnect_timeout_msec"] = reconnectTimeout.count();
yaml["shards_awaiting_timeout_sec"] = shardsAwaitingTimeout.count();
yaml["config_rollback_timeout_sec"] = configRollbackTimeout.count();
yaml["proxy_conn_count"] = proxyConnCount;
yaml["proxy_conn_concurrency"] = proxyConnConcurrency;
yaml["proxy_conn_threads"] = proxyConnThreads;
Expand Down Expand Up @@ -1015,6 +1019,7 @@ void ShardingConfig::GetJSON(JsonBuilder &jb) const {
jb.Put("this_shard_id", thisShardId);
jb.Put("reconnect_timeout_msec", reconnectTimeout.count());
jb.Put("shards_awaiting_timeout_sec", shardsAwaitingTimeout.count());
jb.Put("config_rollback_timeout_sec", configRollbackTimeout.count());
jb.Put("proxy_conn_count", proxyConnCount);
jb.Put("proxy_conn_concurrency", proxyConnConcurrency);
jb.Put("proxy_conn_threads", proxyConnThreads);
Expand Down
1 change: 1 addition & 0 deletions cpp_src/cluster/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct ShardingConfig {
int thisShardId = ShardingKeyType::ProxyOff;
std::chrono::milliseconds reconnectTimeout = std::chrono::milliseconds(3000);
std::chrono::seconds shardsAwaitingTimeout = std::chrono::seconds(30);
std::chrono::seconds configRollbackTimeout = std::chrono::seconds(30);
int proxyConnCount = kDefaultShardingProxyConnCount;
int proxyConnConcurrency = kDefaultShardingProxyCoroPerConn;
int proxyConnThreads = kDefaultShardingProxyConnThreads;
Expand Down
112 changes: 58 additions & 54 deletions cpp_src/cluster/raftmanager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,53 @@ void RaftManager::Configure(const ReplicationConfigData& baseConfig, const Clust
}

Error RaftManager::SendDesiredLeaderId(int nextServerId) {
logTrace("%d SendDesiredLeaderId nextLeaderId = %d", serverId_, nextServerId);
std::vector<client::RaftClient> tmpClients; // Using separated clients sete here to avoid intersection with Leader's pings
std::shared_ptr<void> CloseConnection(nullptr, [this, &tmpClients](void*) {
coroutine::wait_group wgStop;
for (auto& client : tmpClients) {
loop_.spawn(wgStop, [&client]() { client.Stop(); });
}
wgStop.wait();
});

client::ReindexerConfig rpcCfg;
rpcCfg.AppName = "raft_manager_tmp";
rpcCfg.NetTimeout = kRaftTimeout;
rpcCfg.EnableCompression = false;
rpcCfg.RequestDedicatedThread = true;
tmpClients.reserve(nodes_.size());
size_t nextServerNodeIndex = nodes_.size();
for (size_t i = 0; i < nodes_.size(); i++) {
for (size_t i = 0; i < nodes_.size(); ++i) {
auto& client = tmpClients.emplace_back(rpcCfg);
auto err = client.Connect(nodes_[i].dsn, loop_);
(void)err; // Ignore connection errors. Handle them on the status phase
if (nodes_[i].serverId == nextServerId) {
nextServerNodeIndex = i;
break;
}
}

std::shared_ptr<void> CloseConnection(nullptr, [this](void*) {
if (GetLeaderId() != serverId_) { // leader node clients, used for pinging
coroutine::wait_group wgStop;
for (auto& node : nodes_) {
loop_.spawn(wgStop, [&node]() { node.client.Stop(); });
err = client.WithTimeout(kDesiredLeaderTimeout).Status(true);
if (!err.ok()) {
return Error(err.code(), "Target node %s is not available.", nodes_[i].dsn);
}
wgStop.wait();
}
});

if (nextServerNodeIndex != nodes_.size()) {
Error err = clientStatus(nextServerNodeIndex, kDesiredLeaderTimeout);
if (!err.ok()) {
return Error(err.code(), "Target node %s is not available.", nodes_[nextServerNodeIndex].dsn);
}
}

uint32_t okCount = 1;
coroutine::wait_group wg;
std::string errString;

for (size_t nodeId = 0; nodeId < nodes_.size(); ++nodeId) {
auto sendDesiredServerIdToNode = [](client::RaftClient& client, int nextServerId) {
auto err = client.WithTimeout(kDesiredLeaderTimeout).Status(true);
return !err.ok() ? err : client.WithTimeout(kDesiredLeaderTimeout).SetDesiredLeaderId(nextServerId);
};

for (size_t nodeId = 0; nodeId < tmpClients.size(); ++nodeId) {
if (nodeId == nextServerNodeIndex) {
continue;
}

loop_.spawn(wg, [this, nodeId, nextServerId, &errString, &okCount] {
loop_.spawn(wg, [this, nodeId, nextServerId, &errString, &okCount, &tmpClients, &sendDesiredServerIdToNode] {
try {
const auto err = sendDesiredServerIdToNode(nodeId, nextServerId);
if (err.ok()) {
logTrace("%d Sending desired server ID (%d) to node with server ID %d", serverId_, nextServerId, nodes_[nodeId].serverId);
if (auto err = sendDesiredServerIdToNode(tmpClients[nodeId], nextServerId); err.ok()) {
++okCount;
} else {
errString += "[" + err.what() + "]";
Expand All @@ -84,33 +92,45 @@ Error RaftManager::SendDesiredLeaderId(int nextServerId) {
}
wg.wait();
if (nextServerNodeIndex != nodes_.size()) {
Error err = sendDesiredServerIdToNode(nextServerNodeIndex, nextServerId);
if (!err.ok()) {
logTrace("%d Sending desired server ID (%d) to node with server ID %d", serverId_, nextServerId, nextServerId);
if (auto err = sendDesiredServerIdToNode(tmpClients[nextServerNodeIndex], nextServerId); !err.ok()) {
return err;
}
okCount++;
}

if (okCount >= GetConsensusForN(nodes_.size() + 1)) {
return errOK;
return Error();
}

return Error(errNetwork, "Can't send nextLeaderId to servers okCount %d err: %s", okCount, errString);
}

RaftInfo::Role RaftManager::Elections() {
void RaftManager::SetDesiredLeaderId(int serverId) {
logInfo("%d Set (%d) as a desired leader", serverId_, serverId);
nextServerId_.SetNextServerId(serverId);
lastLeaderPingTs_ = {ClockT::time_point()};
}

RaftInfo::Role RaftManager::Elections(bool forceElectionsRestart) {
std::vector<coroutine::routine_t> succeedRoutines;
succeedRoutines.reserve(nodes_.size());
while (!terminate_.load()) {
const int nextServerId = nextServerId_.GetNextServerId();
const bool isDesiredLeader = (nextServerId == serverId_);
if (!isDesiredLeader) {
if (nextServerId != -1) {
endElections(GetTerm(), RaftInfo::Role::Follower);
logInfo("Skipping elections (desired leader id is %d)", serverId_, nextServerId);
return RaftInfo::Role::Follower;
}
if (!forceElectionsRestart && hasRecentLeadersPing(ClockT::now()) && endElections(GetTerm(), RaftInfo::Role::Follower)) {
logInfo("Skipping elections (node is already connected to the active leader (%d))", serverId_, nextServerId);
return RaftInfo::Role::Follower;
}
}
int32_t term = beginElectionsTerm(nextServerId);
logInfo("Starting new elections term for %d. Term number: %d", serverId_, term);
if (nextServerId != -1 && nextServerId != serverId_) {
endElections(term, RaftInfo::Role::Follower);
logInfo("Skipping elections (desired leader id is %d)", serverId_, nextServerId);
return RaftInfo::Role::Follower;
}
const bool isDesiredLeader = (nextServerId == serverId_);
coroutine::wait_group wg;
succeedRoutines.resize(0);
struct {
Expand All @@ -122,7 +142,7 @@ RaftInfo::Role RaftManager::Elections() {
loop_.spawn(wg, [this, &electionsStat, nodeId, term, &succeedRoutines, isDesiredLeader] {
auto& node = nodes_[nodeId];
if (!node.client.Status().ok()) {
node.client.Connect(node.dsn, loop_, client::ConnectOpts().WithExpectedClusterID(clusterID_));
node.client.Connect(node.dsn, loop_, createConnectionOpts());
}
NodeData suggestion, result;
suggestion.serverId = serverId_;
Expand Down Expand Up @@ -201,8 +221,8 @@ RaftInfo::Role RaftManager::Elections() {
return RaftInfo::Role::Follower;
}

bool RaftManager::LeaderIsAvailable(ClockT::time_point now) {
return ((now - lastLeaderPingTs_.load()) < kMinLeaderAwaitInterval) || (GetRole() == RaftInfo::Role::Leader);
bool RaftManager::LeaderIsAvailable(ClockT::time_point now) const noexcept {
return hasRecentLeadersPing(now) || (GetRole() == RaftInfo::Role::Leader);
}

bool RaftManager::FollowersAreAvailable() {
Expand Down Expand Up @@ -392,24 +412,8 @@ bool RaftManager::endElections(int32_t term, RaftInfo::Role result) {

bool RaftManager::isConsensus(size_t num) const noexcept { return num >= GetConsensusForN(nodes_.size() + 1); }

Error RaftManager::clientStatus(size_t index, std::chrono::seconds timeout) {
Error err;
if (!nodes_[index].client.WithTimeout(timeout).Status(true).ok()) {
err = nodes_[index].client.Connect(nodes_[index].dsn, loop_, client::ConnectOpts().WithExpectedClusterID(clusterID_));
if (err.ok()) {
err = nodes_[index].client.WithTimeout(timeout).Status(true);
}
}
return err;
}

Error RaftManager::sendDesiredServerIdToNode(size_t index, int nextServerId) {
Error err = clientStatus(index, kDesiredLeaderTimeout);
if (!err.ok()) {
return err;
}
logTrace("%d Sending desired server ID (%d) to node with server ID %d", serverId_, nextServerId, nodes_[index].serverId);
return nodes_[index].client.WithTimeout(kDesiredLeaderTimeout).SetDesiredLeaderId(nextServerId);
bool RaftManager::hasRecentLeadersPing(RaftManager::ClockT::time_point now) const noexcept {
return (now - lastLeaderPingTs_.load()) < kMinLeaderAwaitInterval;
}

} // namespace cluster
Expand Down
13 changes: 5 additions & 8 deletions cpp_src/cluster/raftmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@ class RaftManager {

void SetTerminateFlag(bool val) noexcept { terminate_ = val; }
void Configure(const ReplicationConfigData &, const ClusterConfigData &);
RaftInfo::Role Elections();
bool LeaderIsAvailable(ClockT::time_point now);
RaftInfo::Role Elections(bool forceElectionsRestart);
bool LeaderIsAvailable(ClockT::time_point now) const noexcept;
bool FollowersAreAvailable();
int32_t GetLeaderId() const { return getLeaderId(voteData_.load()); }
RaftInfo::Role GetRole() const { return getRole(voteData_.load()); }
int32_t GetTerm() const { return getTerm(voteData_.load()); }
Error SuggestLeader(const cluster::NodeData &suggestion, cluster::NodeData &response);
Error SendDesiredLeaderId(int serverId);
void SetDesiredLeaderId(int serverId) {
nextServerId_.SetNextServerId(serverId);
lastLeaderPingTs_ = {ClockT::time_point()};
}
void SetDesiredLeaderId(int serverId);
int GetDesiredLeaderId() { return nextServerId_.GetNextServerId(); }
Error LeadersPing(const cluster::NodeData &);
void AwaitTermination();
Expand Down Expand Up @@ -86,8 +83,8 @@ class RaftManager {
int32_t beginElectionsTerm(int presetLeader);
bool endElections(int32_t term, RaftInfo::Role result);
bool isConsensus(size_t num) const noexcept;
Error sendDesiredServerIdToNode(size_t index, int nextServerId);
Error clientStatus(size_t index, std::chrono::seconds timeout);
bool hasRecentLeadersPing(ClockT::time_point now) const noexcept;
client::ConnectOpts createConnectionOpts() const { return client::ConnectOpts().WithExpectedClusterID(clusterID_); }

net::ev::dynamic_loop &loop_;
ReplicationStatsCollector statsCollector_;
Expand Down
Loading

0 comments on commit 919a4e3

Please sign in to comment.