Skip to content

Commit

Permalink
DAOS-2181 raft: Avoid passing raft_node_t pointers around
Browse files Browse the repository at this point in the history
Passing around pointers to raft_node_t objects and referencing them
all over the code results in errors and crashes due to dangling
pointers when nodes are removed. It is safer to use the node ID to
look up a node from the server's node list.

In the future, it would be ideal to introduce reference counting and
thread safety to Raft as a whole and the node list in particular.

Signed-off-by: Omkar Kulkarni <[email protected]>
  • Loading branch information
Omkar Kulkarni committed Mar 7, 2019
1 parent b9c7290 commit 598d8ea
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 50 deletions.
4 changes: 4 additions & 0 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,10 @@ int raft_is_leader(raft_server_t* me);
* @return 1 if candidate; 0 otherwise */
int raft_is_candidate(raft_server_t* me);

/**
* @return 1 if node ID matches the server; 0 otherwise */
int raft_is_self(raft_server_t* me_, raft_node_t* node);

/**
* @return currently elapsed timeout in milliseconds */
int raft_get_timeout_elapsed(raft_server_t* me);
Expand Down
8 changes: 4 additions & 4 deletions include/raft_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ typedef struct {

/* what this node thinks is the node ID of the current leader, or NULL if
* there isn't a known current leader. */
raft_node_t* current_leader;
int leader_id;

/* my node ID */
int node_id;

/* callbacks */
raft_cbs_t cb;
void* udata;

/* my node ID */
raft_node_t* node;

/* the log which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;

Expand Down
2 changes: 1 addition & 1 deletion src/raft_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,5 @@ int raft_node_is_addition_committed(raft_node_t* me_)
int raft_node_get_id(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
return me->id;
return (NULL == me) ? -1 : me->id;
}
66 changes: 36 additions & 30 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ raft_server_t* raft_new()
}
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;

me->snapshot_in_progress = 0;
raft_set_snapshot_metadata((raft_server_t*)me, 0, 0);
Expand Down Expand Up @@ -106,11 +106,11 @@ void raft_clear(raft_server_t* me_)
raft_randomize_election_timeout(me_);
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;
me->commit_idx = 0;
me->last_applied_idx = 0;
me->num_nodes = 0;
me->node = NULL;
me->node_id = -1;
log_clear(me->log);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ void raft_become_leader(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node == node || !raft_node_is_active(node))
if (raft_is_self(me_, node) || !raft_node_is_active(node))
continue;

raft_node_set_next_idx(node, raft_get_current_idx(me_) + 1);
Expand All @@ -171,8 +171,8 @@ int raft_become_candidate(raft_server_t* me_)
return e;
for (i = 0; i < me->num_nodes; i++)
raft_node_vote_for_me(me->nodes[i], 0);
raft_vote(me_, me->node);
me->current_leader = NULL;
raft_vote_for_nodeid(me_, me->node_id);
me->leader_id = -1;
raft_set_state(me_, RAFT_STATE_CANDIDATE);

raft_randomize_election_timeout(me_);
Expand All @@ -182,7 +182,7 @@ int raft_become_candidate(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node != node &&
if (!raft_is_self(me_, node) &&
raft_node_is_active(node) &&
raft_node_is_voting(node))
{
Expand Down Expand Up @@ -308,7 +308,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -370,7 +370,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
for (i = 0; i < me->num_nodes; i++)
{
raft_node_t* tmpnode = me->nodes[i];
if (me->node != tmpnode &&
if (!raft_is_self(me_, tmpnode) &&
raft_node_is_active(tmpnode) &&
raft_node_is_voting(tmpnode) &&
point <= raft_node_get_match_idx(tmpnode))
Expand Down Expand Up @@ -434,7 +434,7 @@ int raft_recv_appendentries(
}

/* update current leader because ae->term is up to date */
me->current_leader = node;
me->leader_id = raft_node_get_id(node);

me->timeout_elapsed = 0;

Expand Down Expand Up @@ -587,7 +587,7 @@ int raft_recv_requestvote(raft_server_t* me_,
goto done;
}
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
}

if (__should_grant_vote(me, vr))
Expand All @@ -603,7 +603,7 @@ int raft_recv_requestvote(raft_server_t* me_,
r->vote_granted = 0;

/* there must be in an election. */
me->current_leader = NULL;
me->leader_id = -1;

me->timeout_elapsed = 0;
}
Expand Down Expand Up @@ -661,7 +661,7 @@ int raft_recv_requestvote_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (raft_get_current_term(me_) != r->term)
Expand Down Expand Up @@ -730,7 +730,7 @@ int raft_recv_installsnapshot(raft_server_t* me_,
if (!raft_is_follower(me_))
raft_become_follower(me_);

me->current_leader = node;
me->leader_id = raft_node_get_id(node);
me->timeout_elapsed = 0;

if (is->last_idx <= raft_get_commit_idx(me_))
Expand Down Expand Up @@ -778,7 +778,7 @@ int raft_recv_installsnapshot_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -841,8 +841,7 @@ int raft_recv_entry(raft_server_t* me_,
{
raft_node_t* node = me->nodes[i];

if (me->node == node ||
!node ||
if (!node || raft_is_self(me_, node) ||
!raft_node_is_active(node) ||
!raft_node_is_voting(node))
continue;
Expand Down Expand Up @@ -875,7 +874,7 @@ int raft_send_requestvote(raft_server_t* me_, raft_node_t* node)
int e = 0;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

__log(me_, node, "sending requestvote to: %d", raft_node_get_id(node));

Expand Down Expand Up @@ -932,7 +931,7 @@ int raft_apply_entry(raft_server_t* me_)

int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);

switch (ety->type) {
case RAFT_LOGTYPE_ADD_NODE:
Expand All @@ -943,13 +942,16 @@ int raft_apply_entry(raft_server_t* me_)
break;
case RAFT_LOGTYPE_REMOVE_NODE:
/* Do not remove node if there are pending entries that affect it */
if (raft_node_get_offered_idx(node) == log_idx)
if (node && raft_node_get_offered_idx(node) == log_idx)
raft_remove_node(me_, node);
break;
}
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
if (node)
{
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
}

return 0;
}
Expand Down Expand Up @@ -985,7 +987,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
raft_server_private_t* me = (raft_server_private_t*)me_;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

if (!(me->cb.send_appendentries))
return -1;
Expand Down Expand Up @@ -1026,7 +1028,8 @@ int raft_send_appendentries_all(raft_server_t* me_)
me->timeout_elapsed = 0;
for (i = 0; i < me->num_nodes; i++)
{
if (me->node == me->nodes[i] || !raft_node_is_active(me->nodes[i]))
if (raft_is_self(me_, me->nodes[i]) ||
!raft_node_is_active(me->nodes[i]))
continue;

e = raft_send_appendentries(me_, me->nodes[i]);
Expand Down Expand Up @@ -1060,7 +1063,7 @@ raft_node_t* raft_add_node_internal(raft_server_t* me_, raft_entry_t *ety, void*
me->nodes = p;
me->nodes[me->num_nodes - 1] = node;
if (is_self)
me->node = me->nodes[me->num_nodes - 1];
me->node_id = id;

node = me->nodes[me->num_nodes - 1];

Expand Down Expand Up @@ -1125,7 +1128,7 @@ int raft_get_nvotes_for_me(raft_server_t* me_)

for (i = 0, votes = 0; i < me->num_nodes; i++)
{
if (me->node != me->nodes[i] &&
if (!raft_is_self(me_, me->nodes[i]) &&
raft_node_is_active(me->nodes[i]) &&
raft_node_is_voting(me->nodes[i]) &&
raft_node_has_vote_for_me(me->nodes[i]))
Expand Down Expand Up @@ -1241,8 +1244,9 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* entries,
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_),
ety, idx + i);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
raft_node_set_offered_idx(node, idx + i);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (node)
raft_node_set_offered_idx(node, idx + i);
}
}

Expand Down Expand Up @@ -1422,7 +1426,9 @@ static void raft_reset_node_indices(raft_server_t *me_, int max_idx)
continue;
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (!node)
continue;
int off_idx = raft_node_get_offered_idx(node);
if (off_idx > max_idx || off_idx < idx)
{
Expand Down
26 changes: 11 additions & 15 deletions src/raft_server_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ void raft_set_request_timeout(raft_server_t* me_, int millisec)
int raft_get_nodeid(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
if (!me->node)
return -1;
return raft_node_get_id(me->node);
return me->node_id;
}

int raft_get_election_timeout(raft_server_t* me_)
Expand Down Expand Up @@ -140,7 +138,7 @@ void raft_set_state(raft_server_t* me_, int state)
raft_server_private_t* me = (raft_server_private_t*)me_;
/* if became the leader, then update the current leader entry */
if (state == RAFT_STATE_LEADER)
me->current_leader = me->node;
me->leader_id = me->node_id;
me->state = state;
}

Expand All @@ -164,13 +162,7 @@ raft_node_t* raft_get_node(raft_server_t *me_, int nodeid)
raft_node_t* raft_get_my_node(raft_server_t *me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
int i;

for (i = 0; i < me->num_nodes; i++)
if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i]))
return me->nodes[i];

return NULL;
return raft_get_node(me_, me->node_id);
}

raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
Expand All @@ -182,15 +174,13 @@ raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
int raft_get_current_leader(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
if (me->current_leader)
return raft_node_get_id(me->current_leader);
return -1;
return me->leader_id;
}

raft_node_t* raft_get_current_leader_node(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
return me->current_leader;
return raft_get_node(me_, me->leader_id);
}

void* raft_get_udata(raft_server_t* me_)
Expand All @@ -213,6 +203,12 @@ int raft_is_candidate(raft_server_t* me_)
return raft_get_state(me_) == RAFT_STATE_CANDIDATE;
}

int raft_is_self(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (void*)me_;
return (node && raft_node_get_id(node) == me->node_id);
}

int raft_get_last_log_term(raft_server_t* me_)
{
int current_idx = raft_get_current_idx(me_);
Expand Down

0 comments on commit 598d8ea

Please sign in to comment.