Skip to content

Commit

Permalink
DAOS-2181 raft: Avoid passing raft_node_t pointers around (daos-stack#21
Browse files Browse the repository at this point in the history
)

Passing around pointers to raft_node_t objects and referencing them
all over the code results in errors and crashes due to dangling
pointers when nodes are removed. It is safer to use the node ID to
look up a node from the server's node list.

In the future, it would be ideal to introduce reference counting and
thread safety to Raft as a whole and the node list in particular.

Signed-off-by: Omkar Kulkarni <[email protected]>
  • Loading branch information
okulkarni authored and liw committed Mar 8, 2019
1 parent b9c7290 commit e6c4369
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 52 deletions.
16 changes: 10 additions & 6 deletions include/raft_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ typedef struct {
int election_timeout_rand;
int request_timeout;

/* what this node thinks is the node ID of the current leader, or NULL if
* there isn't a known current leader. */
raft_node_t* current_leader;
/* what this node thinks is the node ID of the current leader,
* or -1 if there isn't a known current leader. */
int leader_id;

/* my node ID */
int node_id;

/* callbacks */
raft_cbs_t cb;
void* udata;

/* my node ID */
raft_node_t* node;

/* the log which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;

Expand Down Expand Up @@ -108,6 +108,10 @@ void raft_set_state(raft_server_t* me_, int state);

int raft_get_state(raft_server_t* me_);

/**
* @return 1 if node ID matches the server; 0 otherwise */
int raft_is_self(raft_server_t* me_, raft_node_t* node);

raft_node_t* raft_node_new(void* udata, int id);

void raft_node_free(raft_node_t* me_);
Expand Down
2 changes: 1 addition & 1 deletion src/raft_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,5 @@ int raft_node_is_addition_committed(raft_node_t* me_)
int raft_node_get_id(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
return me->id;
return (NULL == me) ? -1 : me->id;
}
66 changes: 36 additions & 30 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ raft_server_t* raft_new()
}
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;

me->snapshot_in_progress = 0;
raft_set_snapshot_metadata((raft_server_t*)me, 0, 0);
Expand Down Expand Up @@ -106,11 +106,11 @@ void raft_clear(raft_server_t* me_)
raft_randomize_election_timeout(me_);
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;
me->commit_idx = 0;
me->last_applied_idx = 0;
me->num_nodes = 0;
me->node = NULL;
me->node_id = -1;
log_clear(me->log);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ void raft_become_leader(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node == node || !raft_node_is_active(node))
if (raft_is_self(me_, node) || !raft_node_is_active(node))
continue;

raft_node_set_next_idx(node, raft_get_current_idx(me_) + 1);
Expand All @@ -171,8 +171,8 @@ int raft_become_candidate(raft_server_t* me_)
return e;
for (i = 0; i < me->num_nodes; i++)
raft_node_vote_for_me(me->nodes[i], 0);
raft_vote(me_, me->node);
me->current_leader = NULL;
raft_vote_for_nodeid(me_, me->node_id);
me->leader_id = -1;
raft_set_state(me_, RAFT_STATE_CANDIDATE);

raft_randomize_election_timeout(me_);
Expand All @@ -182,7 +182,7 @@ int raft_become_candidate(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node != node &&
if (!raft_is_self(me_, node) &&
raft_node_is_active(node) &&
raft_node_is_voting(node))
{
Expand Down Expand Up @@ -308,7 +308,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -370,7 +370,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
for (i = 0; i < me->num_nodes; i++)
{
raft_node_t* tmpnode = me->nodes[i];
if (me->node != tmpnode &&
if (!raft_is_self(me_, tmpnode) &&
raft_node_is_active(tmpnode) &&
raft_node_is_voting(tmpnode) &&
point <= raft_node_get_match_idx(tmpnode))
Expand Down Expand Up @@ -434,7 +434,7 @@ int raft_recv_appendentries(
}

/* update current leader because ae->term is up to date */
me->current_leader = node;
me->leader_id = raft_node_get_id(node);

me->timeout_elapsed = 0;

Expand Down Expand Up @@ -587,7 +587,7 @@ int raft_recv_requestvote(raft_server_t* me_,
goto done;
}
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
}

if (__should_grant_vote(me, vr))
Expand All @@ -603,7 +603,7 @@ int raft_recv_requestvote(raft_server_t* me_,
r->vote_granted = 0;

/* there must be in an election. */
me->current_leader = NULL;
me->leader_id = -1;

me->timeout_elapsed = 0;
}
Expand Down Expand Up @@ -661,7 +661,7 @@ int raft_recv_requestvote_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (raft_get_current_term(me_) != r->term)
Expand Down Expand Up @@ -730,7 +730,7 @@ int raft_recv_installsnapshot(raft_server_t* me_,
if (!raft_is_follower(me_))
raft_become_follower(me_);

me->current_leader = node;
me->leader_id = raft_node_get_id(node);
me->timeout_elapsed = 0;

if (is->last_idx <= raft_get_commit_idx(me_))
Expand Down Expand Up @@ -778,7 +778,7 @@ int raft_recv_installsnapshot_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -841,8 +841,7 @@ int raft_recv_entry(raft_server_t* me_,
{
raft_node_t* node = me->nodes[i];

if (me->node == node ||
!node ||
if (!node || raft_is_self(me_, node) ||
!raft_node_is_active(node) ||
!raft_node_is_voting(node))
continue;
Expand Down Expand Up @@ -875,7 +874,7 @@ int raft_send_requestvote(raft_server_t* me_, raft_node_t* node)
int e = 0;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

__log(me_, node, "sending requestvote to: %d", raft_node_get_id(node));

Expand Down Expand Up @@ -932,7 +931,7 @@ int raft_apply_entry(raft_server_t* me_)

int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);

switch (ety->type) {
case RAFT_LOGTYPE_ADD_NODE:
Expand All @@ -943,13 +942,16 @@ int raft_apply_entry(raft_server_t* me_)
break;
case RAFT_LOGTYPE_REMOVE_NODE:
/* Do not remove node if there are pending entries that affect it */
if (raft_node_get_offered_idx(node) == log_idx)
if (node && raft_node_get_offered_idx(node) == log_idx)
raft_remove_node(me_, node);
break;
}
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
if (node)
{
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
}

return 0;
}
Expand Down Expand Up @@ -985,7 +987,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
raft_server_private_t* me = (raft_server_private_t*)me_;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

if (!(me->cb.send_appendentries))
return -1;
Expand Down Expand Up @@ -1026,7 +1028,8 @@ int raft_send_appendentries_all(raft_server_t* me_)
me->timeout_elapsed = 0;
for (i = 0; i < me->num_nodes; i++)
{
if (me->node == me->nodes[i] || !raft_node_is_active(me->nodes[i]))
if (raft_is_self(me_, me->nodes[i]) ||
!raft_node_is_active(me->nodes[i]))
continue;

e = raft_send_appendentries(me_, me->nodes[i]);
Expand Down Expand Up @@ -1060,7 +1063,7 @@ raft_node_t* raft_add_node_internal(raft_server_t* me_, raft_entry_t *ety, void*
me->nodes = p;
me->nodes[me->num_nodes - 1] = node;
if (is_self)
me->node = me->nodes[me->num_nodes - 1];
me->node_id = id;

node = me->nodes[me->num_nodes - 1];

Expand Down Expand Up @@ -1125,7 +1128,7 @@ int raft_get_nvotes_for_me(raft_server_t* me_)

for (i = 0, votes = 0; i < me->num_nodes; i++)
{
if (me->node != me->nodes[i] &&
if (!raft_is_self(me_, me->nodes[i]) &&
raft_node_is_active(me->nodes[i]) &&
raft_node_is_voting(me->nodes[i]) &&
raft_node_has_vote_for_me(me->nodes[i]))
Expand Down Expand Up @@ -1241,8 +1244,9 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* entries,
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_),
ety, idx + i);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
raft_node_set_offered_idx(node, idx + i);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (node)
raft_node_set_offered_idx(node, idx + i);
}
}

Expand Down Expand Up @@ -1422,7 +1426,9 @@ static void raft_reset_node_indices(raft_server_t *me_, int max_idx)
continue;
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (!node)
continue;
int off_idx = raft_node_get_offered_idx(node);
if (off_idx > max_idx || off_idx < idx)
{
Expand Down
26 changes: 11 additions & 15 deletions src/raft_server_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ void raft_set_request_timeout(raft_server_t* me_, int millisec)
int raft_get_nodeid(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
if (!me->node)
return -1;
return raft_node_get_id(me->node);
return me->node_id;
}

int raft_get_election_timeout(raft_server_t* me_)
Expand Down Expand Up @@ -140,7 +138,7 @@ void raft_set_state(raft_server_t* me_, int state)
raft_server_private_t* me = (raft_server_private_t*)me_;
/* if became the leader, then update the current leader entry */
if (state == RAFT_STATE_LEADER)
me->current_leader = me->node;
me->leader_id = me->node_id;
me->state = state;
}

Expand All @@ -164,13 +162,7 @@ raft_node_t* raft_get_node(raft_server_t *me_, int nodeid)
raft_node_t* raft_get_my_node(raft_server_t *me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
int i;

for (i = 0; i < me->num_nodes; i++)
if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i]))
return me->nodes[i];

return NULL;
return raft_get_node(me_, me->node_id);
}

raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
Expand All @@ -182,15 +174,13 @@ raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
int raft_get_current_leader(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
if (me->current_leader)
return raft_node_get_id(me->current_leader);
return -1;
return me->leader_id;
}

raft_node_t* raft_get_current_leader_node(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
return me->current_leader;
return raft_get_node(me_, me->leader_id);
}

void* raft_get_udata(raft_server_t* me_)
Expand All @@ -213,6 +203,12 @@ int raft_is_candidate(raft_server_t* me_)
return raft_get_state(me_) == RAFT_STATE_CANDIDATE;
}

int raft_is_self(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (void*)me_;
return (node && raft_node_get_id(node) == me->node_id);
}

int raft_get_last_log_term(raft_server_t* me_)
{
int current_idx = raft_get_current_idx(me_);
Expand Down

0 comments on commit e6c4369

Please sign in to comment.