Skip to content

Commit

Permalink
Make catching_up_ flag durable (#562)
Browse files Browse the repository at this point in the history
* If a new member joins a cluster but restarts while receiving the
snapshot, its `catching_up_` flag will be cleared, causing the join
process to stall.

* To prevent this issue, the `catching_up_` flag will be stored as
part of the `srv_state`. After the server restarts, the leader and
the server will attempt the snapshot transmission again.

* The `srv_state` format has been changed, but it will remain
backward compatible.
  • Loading branch information
greensky00 authored Jan 18, 2025
1 parent bf5e149 commit 3008bef
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 38 deletions.
9 changes: 1 addition & 8 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public:
*
* @return `true` if it is in catch-up mode.
*/
bool is_catching_up() const { return catching_up_; }
bool is_catching_up() const { return state_->is_catching_up(); }

/**
* Check if this server is receiving snapshot from leader.
Expand Down Expand Up @@ -1182,13 +1182,6 @@ protected:
*/
bool config_changing_;

/**
* `true` if this server falls behind leader so that
* catching up the latest log. It will not receive
* normal `append_entries` request while in catch-up status.
*/
std::atomic<bool> catching_up_;

/**
* `true` if this server receives out of log range message
* from leader. Once this flag is set, this server will not
Expand Down
54 changes: 43 additions & 11 deletions include/libnuraft/srv_state.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ public:
: term_(0L)
, voted_for_(-1)
, election_timer_allowed_(true)
, catching_up_(false)
{}

srv_state(ulong term, int voted_for, bool et_allowed)
srv_state(ulong term, int voted_for, bool et_allowed, bool catching_up)
: term_(term)
, voted_for_(voted_for)
, election_timer_allowed_(et_allowed)
, catching_up_(catching_up)
{}

/**
Expand All @@ -67,17 +69,22 @@ public:
static ptr<srv_state> deserialize_v0(buffer& buf) {
ulong term = buf.get_ulong();
int voted_for = buf.get_int();
return cs_new<srv_state>(term, voted_for, true);
return cs_new<srv_state>(term, voted_for, true, false);
}

static ptr<srv_state> deserialize_v1p(buffer& buf) {
buffer_serializer bs(buf);
uint8_t ver = bs.get_u8();
(void)ver;

ulong term = bs.get_u64();
int voted_for = bs.get_i32();
bool et_allowed = (bs.get_u8() == 1);
return cs_new<srv_state>(term, voted_for, et_allowed);
bool catching_up = false;
if (ver >= 2 && bs.pos() < buf.size()) {
catching_up = (bs.get_u8() == 1);
}

return cs_new<srv_state>(term, voted_for, et_allowed, catching_up);
}

void set_inc_term_func(inc_term_func to) {
Expand Down Expand Up @@ -114,10 +121,18 @@ public:
return election_timer_allowed_;
}

bool is_catching_up() const {
return catching_up_;
}

void allow_election_timer(bool to) {
election_timer_allowed_ = to;
}

void set_catching_up(bool to) {
catching_up_ = to;
}

ptr<buffer> serialize() const {
return serialize_v1p(CURRENT_VERSION);
}
Expand All @@ -135,21 +150,30 @@ public:
// version 1 byte
// term 8 bytes
// voted_for 4 bytes
// election timer 1 byte
ptr<buffer> buf = buffer::alloc( sizeof(uint8_t) +
sizeof(uint64_t) +
sizeof(int32_t) +
sizeof(uint8_t) );
// election timer 1 byte (since v1)
// catching up 1 byte (since v2)

size_t buf_len = sizeof(uint8_t) +
sizeof(uint64_t) +
sizeof(int32_t) +
sizeof(uint8_t);
if (version >= 2) {
buf_len += sizeof(uint8_t);
}
ptr<buffer> buf = buffer::alloc(buf_len);
buffer_serializer bs(buf);
bs.put_u8(version);
bs.put_u64(term_);
bs.put_i32(voted_for_);
bs.put_u8( election_timer_allowed_ ? 1 : 0 );
bs.put_u8(election_timer_allowed_ ? 1 : 0);
if (version >= 2) {
bs.put_u8(catching_up_ ? 1 : 0);
}
return buf;
}

private:
const uint8_t CURRENT_VERSION = 1;
const uint8_t CURRENT_VERSION = 2;

/**
* Term.
Expand All @@ -167,6 +191,14 @@ private:
*/
std::atomic<bool> election_timer_allowed_;

/**
* true if this server has joined the cluster but has not yet
* fully caught up with the latest log. While in the catch-up status,
* this server will not receive normal append_entries requests.
*/
std::atomic<bool> catching_up_;


/**
* Custom callback function for increasing term.
* If not given, term will be increased by 1.
Expand Down
5 changes: 3 additions & 2 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ ptr<req_msg> raft_server::create_append_entries_req(ptr<peer>& pp ,
ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
{
bool supp_exp_warning = false;
if (catching_up_) {
if (state_->is_catching_up()) {
// WARNING:
// We should clear the `catching_up_` flag only after this node's
// config has been added to the cluster config. Otherwise, if we
Expand All @@ -623,7 +623,8 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
ptr<srv_config> my_config = cur_config->get_server(id_);
if (my_config && !my_config->is_new_joiner()) {
p_in("catch-up process is done, clearing the flag");
catching_up_ = false;
state_->set_catching_up(false);
ctx_->state_mgr_->save_state(*state_);
}
supp_exp_warning = true;
}
Expand Down
7 changes: 4 additions & 3 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
steps_to_down_ = 0;
if (!(*it)->is_new_joiner() &&
role_ == srv_role::follower &&
catching_up_) {
state_->is_catching_up()) {
// Except for new joiner type, if this server is added
// to the cluster config, that means the sync is done.
// Start election timer without waiting for
Expand All @@ -770,7 +770,8 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
// that is also notified by a new cluster config.
p_in("now this node is the part of cluster, "
"catch-up process is done, clearing the flag");
catching_up_ = false;
state_->set_catching_up(false);
ctx_->state_mgr_->save_state(*state_);
restart_election_timer();
}
}
Expand Down Expand Up @@ -827,7 +828,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
for ( std::vector<int32>::const_iterator it = srvs_removed.begin();
it != srvs_removed.end(); ++it ) {
int32 srv_removed = *it;
if (srv_removed == id_ && !catching_up_) {
if (srv_removed == id_ && !state_->is_catching_up()) {
p_in("this server (%d) has been removed from the cluster, "
"will step down itself soon. config log idx %" PRIu64,
id_,
Expand Down
8 changes: 4 additions & 4 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ ptr<resp_msg> raft_server::handle_join_cluster_req(req_msg& req) {
// in progress. It should gracefully handle the new request and should
// not ruin the current request.
bool reset_commit_idx = true;
if (catching_up_) {
if (state_->is_catching_up()) {
p_wn("this server is already in log syncing mode, "
"but let's do it again: sm idx %" PRIu64 ", quick commit idx %" PRIu64 ", "
"will not reset commit index",
Expand All @@ -186,7 +186,7 @@ ptr<resp_msg> raft_server::handle_join_cluster_req(req_msg& req) {
}

p_in("got join cluster req from leader %d", req.get_src());
catching_up_ = true;
state_->set_catching_up(true);
role_ = srv_role::follower;
index_at_becoming_leader_ = 0;
leader_ = req.get_src();
Expand Down Expand Up @@ -343,7 +343,7 @@ ptr<resp_msg> raft_server::handle_log_sync_req(req_msg& req) {

p_db("entries size %d, type %d, catching_up %s\n",
(int)entries.size(), (int)entries[0]->get_val_type(),
(catching_up_)?"true":"false");
state_->is_catching_up() ? "true" : "false");
if ( entries.size() != 1 ||
entries[0]->get_val_type() != log_val_type::log_pack ) {
p_wn("receive an invalid LogSyncRequest as the log entry value "
Expand All @@ -352,7 +352,7 @@ ptr<resp_msg> raft_server::handle_log_sync_req(req_msg& req) {
return resp;
}

if (!catching_up_) {
if (!state_->is_catching_up()) {
p_wn("This server is ready for cluster, ignore the request, "
"my next log idx %" PRIu64 "", resp->get_next_idx());
return resp;
Expand Down
4 changes: 2 additions & 2 deletions src/handle_snapshot_sync.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ptr<req_msg> raft_server::create_sync_snapshot_req(ptr<peer>& pp,
}

ptr<resp_msg> raft_server::handle_install_snapshot_req(req_msg& req, std::unique_lock<std::recursive_mutex>& guard) {
if (req.get_term() == state_->get_term() && !catching_up_) {
if (req.get_term() == state_->get_term() && !state_->is_catching_up()) {
if (role_ == srv_role::candidate) {
become_follower();

Expand All @@ -263,7 +263,7 @@ ptr<resp_msg> raft_server::handle_install_snapshot_req(req_msg& req, std::unique
req.get_src(),
log_store_->next_slot() );

if (!catching_up_ && req.get_term() < state_->get_term()) {
if (!state_->is_catching_up() && req.get_term() < state_->get_term()) {
p_wn("received an install snapshot request (%" PRIu64 ") which has lower term "
"than this server (%" PRIu64 "), decline the request",
req.get_term(), state_->get_term());
Expand Down
4 changes: 2 additions & 2 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void raft_server::restart_election_timer() {
// don't start the election timer while this server is still catching up the logs
// or this server is the leader
recur_lock(lock_);
if (catching_up_ || role_ == srv_role::leader) {
if (state_->is_catching_up() || role_ == srv_role::leader) {
return;
}

Expand Down Expand Up @@ -243,7 +243,7 @@ void raft_server::handle_election_timeout() {
return;
}

if (catching_up_) {
if (state_->is_catching_up()) {
// this is a new server for the cluster, will not send out vote req
// until conf that includes this srv is committed
p_in("election timeout while joining the cluster, ignore it.");
Expand Down
6 changes: 3 additions & 3 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ ptr<resp_msg> raft_server::handle_vote_req(req_msg& req) {
p_in("[VOTE REQ] force vote request, will ignore priority");
ignore_priority = true;
}
if (catching_up_) {
if (state_->is_catching_up()) {
p_in("[VOTE REQ] this server is catching-up with leader, "
"will ignore priority");
ignore_priority = true;
Expand Down Expand Up @@ -437,10 +437,10 @@ ptr<resp_msg> raft_server::handle_prevote_req(req_msg& req) {
// normal append_entries request so that `hb_alive_` may not
// be cleared properly. Hence, it should accept any pre-vote
// requests.
if (catching_up_) {
if (state_->is_catching_up()) {
p_in("this server is catching up, always accept pre-vote");
}
if (!hb_alive_ || catching_up_) {
if (!hb_alive_ || state_->is_catching_up()) {
p_in("pre-vote decision: O (grant)");
resp->accept(log_store_->next_slot());
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ raft_server::raft_server(context* ctx, const init_options& opt)
, hb_alive_(false)
, election_completed_(true)
, config_changing_(false)
, catching_up_(false)
, out_of_log_range_(false)
, data_fresh_(false)
, stopping_(false)
Expand Down Expand Up @@ -160,6 +159,8 @@ raft_server::raft_server(context* ctx, const init_options& opt)
<< "term " << state_->get_term() << "\n"
<< "election timer " << ( state_->is_election_timer_allowed()
? "allowed" : "not allowed" ) << "\n"
<< "catching-up " << ( state_->is_catching_up()
? "yes" : "no" ) << "\n"
<< "log store start " << log_store_->start_index()
<< ", end " << log_store_->next_slot() - 1 << "\n"
<< "config log idx " << c_conf->get_log_idx()
Expand Down
15 changes: 13 additions & 2 deletions tests/unit/fake_network.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,23 @@ ptr<FakeClient> FakeNetwork::findClient(const std::string& endpoint) {
bool FakeNetwork::delieverReqTo(const std::string& endpoint,
bool random_order)
{
SimpleLogger* ll = base->getLogger();

// this: source (sending request)
// conn->dstNet (endpoint): destination (sending response)
ptr<FakeClient> conn = findClient(endpoint);

// If destination is offline, make failure.
if (!conn->isDstOnline()) return makeReqFail(endpoint, random_order);
if (!conn->isDstOnline()) {
_log_info(ll, "destination %s is offline", endpoint.c_str());
return makeReqFail(endpoint, random_order);
}

auto pkg_entry = conn->pendingReqs.begin();
if (pkg_entry == conn->pendingReqs.end()) return false;

ReqPkg& pkg = *pkg_entry;

SimpleLogger* ll = base->getLogger();
_log_info(ll, "[BEGIN] send/process %s -> %s, %s",
myEndpoint.c_str(), endpoint.c_str(),
msg_type_to_string( pkg.req->get_type() ).c_str() );
Expand Down Expand Up @@ -232,6 +236,11 @@ bool FakeNetwork::handleRespFrom(const std::string& endpoint,
// Copy shared pointer for the case of reconnection,
// as it drops all resps.
RespPkg pkg = *pkg_entry;
if (!pkg.resp) {
_log_info(ll, "empty response from %s", endpoint.c_str());
return false;
}

_log_info(ll, "[BEGIN] deliver response %s -> %s, %s",
endpoint.c_str(), myEndpoint.c_str(),
msg_type_to_string( pkg.resp->get_type() ).c_str() );
Expand Down Expand Up @@ -277,6 +286,8 @@ void FakeNetwork::stop() {
}

void FakeNetwork::shutdown() {
goesOffline();

std::lock_guard<std::mutex> ll(clientsLock);
for (auto& entry: clients) {
ptr<FakeClient>& cc = entry.second;
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/raft_package_fake.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public:
const raft_server::init_options& opt =
raft_server::init_options(false, true, true))
{
fNet->goesOnline();

if (!given_params) {
params.with_election_timeout_lower(0);
params.with_election_timeout_upper(10000);
Expand Down
Loading

0 comments on commit 3008bef

Please sign in to comment.