Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-2181 raft: Avoid passing raft_node_t pointers around #21

Merged
merged 2 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions include/raft_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ typedef struct {
int election_timeout_rand;
int request_timeout;

/* what this node thinks is the node ID of the current leader, or NULL if
* there isn't a known current leader. */
raft_node_t* current_leader;
/* what this node thinks is the node ID of the current leader,
* or -1 if there isn't a known current leader. */
int leader_id;

/* my node ID */
int node_id;

/* callbacks */
raft_cbs_t cb;
void* udata;

/* my node ID */
raft_node_t* node;

/* the log which has a voting cfg change, otherwise -1 */
int voting_cfg_change_log_idx;

Expand Down Expand Up @@ -108,6 +108,10 @@ void raft_set_state(raft_server_t* me_, int state);

int raft_get_state(raft_server_t* me_);

/**
* @return 1 if node ID matches the server; 0 otherwise */
int raft_is_self(raft_server_t* me_, raft_node_t* node);

raft_node_t* raft_node_new(void* udata, int id);

void raft_node_free(raft_node_t* me_);
Expand Down
2 changes: 1 addition & 1 deletion src/raft_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,5 @@ int raft_node_is_addition_committed(raft_node_t* me_)
int raft_node_get_id(raft_node_t* me_)
{
raft_node_private_t* me = (raft_node_private_t*)me_;
return me->id;
return (NULL == me) ? -1 : me->id;
}
66 changes: 36 additions & 30 deletions src/raft_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ raft_server_t* raft_new()
}
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;

me->snapshot_in_progress = 0;
raft_set_snapshot_metadata((raft_server_t*)me, 0, 0);
Expand Down Expand Up @@ -106,11 +106,11 @@ void raft_clear(raft_server_t* me_)
raft_randomize_election_timeout(me_);
me->voting_cfg_change_log_idx = -1;
raft_set_state((raft_server_t*)me, RAFT_STATE_FOLLOWER);
me->current_leader = NULL;
me->leader_id = -1;
me->commit_idx = 0;
me->last_applied_idx = 0;
me->num_nodes = 0;
me->node = NULL;
me->node_id = -1;
log_clear(me->log);
}

Expand Down Expand Up @@ -150,7 +150,7 @@ void raft_become_leader(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node == node || !raft_node_is_active(node))
if (raft_is_self(me_, node) || !raft_node_is_active(node))
continue;

raft_node_set_next_idx(node, raft_get_current_idx(me_) + 1);
Expand All @@ -171,8 +171,8 @@ int raft_become_candidate(raft_server_t* me_)
return e;
for (i = 0; i < me->num_nodes; i++)
raft_node_vote_for_me(me->nodes[i], 0);
raft_vote(me_, me->node);
me->current_leader = NULL;
raft_vote_for_nodeid(me_, me->node_id);
me->leader_id = -1;
raft_set_state(me_, RAFT_STATE_CANDIDATE);

raft_randomize_election_timeout(me_);
Expand All @@ -182,7 +182,7 @@ int raft_become_candidate(raft_server_t* me_)
{
raft_node_t* node = me->nodes[i];

if (me->node != node &&
if (!raft_is_self(me_, node) &&
raft_node_is_active(node) &&
raft_node_is_voting(node))
{
Expand Down Expand Up @@ -308,7 +308,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -370,7 +370,7 @@ int raft_recv_appendentries_response(raft_server_t* me_,
for (i = 0; i < me->num_nodes; i++)
{
raft_node_t* tmpnode = me->nodes[i];
if (me->node != tmpnode &&
if (!raft_is_self(me_, tmpnode) &&
raft_node_is_active(tmpnode) &&
raft_node_is_voting(tmpnode) &&
point <= raft_node_get_match_idx(tmpnode))
Expand Down Expand Up @@ -434,7 +434,7 @@ int raft_recv_appendentries(
}

/* update current leader because ae->term is up to date */
me->current_leader = node;
me->leader_id = raft_node_get_id(node);

me->timeout_elapsed = 0;

Expand Down Expand Up @@ -587,7 +587,7 @@ int raft_recv_requestvote(raft_server_t* me_,
goto done;
}
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
}

if (__should_grant_vote(me, vr))
Expand All @@ -603,7 +603,7 @@ int raft_recv_requestvote(raft_server_t* me_,
r->vote_granted = 0;

/* there must be in an election. */
me->current_leader = NULL;
me->leader_id = -1;

me->timeout_elapsed = 0;
}
Expand Down Expand Up @@ -661,7 +661,7 @@ int raft_recv_requestvote_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (raft_get_current_term(me_) != r->term)
Expand Down Expand Up @@ -730,7 +730,7 @@ int raft_recv_installsnapshot(raft_server_t* me_,
if (!raft_is_follower(me_))
raft_become_follower(me_);

me->current_leader = node;
me->leader_id = raft_node_get_id(node);
me->timeout_elapsed = 0;

if (is->last_idx <= raft_get_commit_idx(me_))
Expand Down Expand Up @@ -778,7 +778,7 @@ int raft_recv_installsnapshot_response(raft_server_t* me_,
if (0 != e)
return e;
raft_become_follower(me_);
me->current_leader = NULL;
me->leader_id = -1;
return 0;
}
else if (me->current_term != r->term)
Expand Down Expand Up @@ -841,8 +841,7 @@ int raft_recv_entry(raft_server_t* me_,
{
raft_node_t* node = me->nodes[i];

if (me->node == node ||
!node ||
if (!node || raft_is_self(me_, node) ||
!raft_node_is_active(node) ||
!raft_node_is_voting(node))
continue;
Expand Down Expand Up @@ -875,7 +874,7 @@ int raft_send_requestvote(raft_server_t* me_, raft_node_t* node)
int e = 0;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

__log(me_, node, "sending requestvote to: %d", raft_node_get_id(node));

Expand Down Expand Up @@ -932,7 +931,7 @@ int raft_apply_entry(raft_server_t* me_)

int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, log_idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);

switch (ety->type) {
case RAFT_LOGTYPE_ADD_NODE:
Expand All @@ -943,13 +942,16 @@ int raft_apply_entry(raft_server_t* me_)
break;
case RAFT_LOGTYPE_REMOVE_NODE:
/* Do not remove node if there are pending entries that affect it */
if (raft_node_get_offered_idx(node) == log_idx)
if (node && raft_node_get_offered_idx(node) == log_idx)
raft_remove_node(me_, node);
break;
}
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
if (node)
{
raft_node_set_applied_idx(node, log_idx);
if (raft_node_get_offered_idx(node) == log_idx)
raft_node_set_offered_idx(node, -1);
}

return 0;
}
Expand Down Expand Up @@ -985,7 +987,7 @@ int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
raft_server_private_t* me = (raft_server_private_t*)me_;

assert(node);
assert(node != me->node);
assert(!raft_is_self(me_, node));

if (!(me->cb.send_appendentries))
return -1;
Expand Down Expand Up @@ -1026,7 +1028,8 @@ int raft_send_appendentries_all(raft_server_t* me_)
me->timeout_elapsed = 0;
for (i = 0; i < me->num_nodes; i++)
{
if (me->node == me->nodes[i] || !raft_node_is_active(me->nodes[i]))
if (raft_is_self(me_, me->nodes[i]) ||
!raft_node_is_active(me->nodes[i]))
continue;

e = raft_send_appendentries(me_, me->nodes[i]);
Expand Down Expand Up @@ -1060,7 +1063,7 @@ raft_node_t* raft_add_node_internal(raft_server_t* me_, raft_entry_t *ety, void*
me->nodes = p;
me->nodes[me->num_nodes - 1] = node;
if (is_self)
me->node = me->nodes[me->num_nodes - 1];
me->node_id = id;

node = me->nodes[me->num_nodes - 1];

Expand Down Expand Up @@ -1125,7 +1128,7 @@ int raft_get_nvotes_for_me(raft_server_t* me_)

for (i = 0, votes = 0; i < me->num_nodes; i++)
{
if (me->node != me->nodes[i] &&
if (!raft_is_self(me_, me->nodes[i]) &&
raft_node_is_active(me->nodes[i]) &&
raft_node_is_voting(me->nodes[i]) &&
raft_node_has_vote_for_me(me->nodes[i]))
Expand Down Expand Up @@ -1241,8 +1244,9 @@ void raft_offer_log(raft_server_t* me_, raft_entry_t* entries,
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_),
ety, idx + i);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
raft_node_set_offered_idx(node, idx + i);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (node)
raft_node_set_offered_idx(node, idx + i);
}
}

Expand Down Expand Up @@ -1422,7 +1426,9 @@ static void raft_reset_node_indices(raft_server_t *me_, int max_idx)
continue;
int node_id = me->cb.log_get_node_id(me_, raft_get_udata(me_), ety, idx);
raft_node_t* node = raft_get_node(me_, node_id);
assert(node);
assert(node || ety->type == RAFT_LOGTYPE_REMOVE_NODE);
if (!node)
continue;
int off_idx = raft_node_get_offered_idx(node);
if (off_idx > max_idx || off_idx < idx)
{
Expand Down
26 changes: 11 additions & 15 deletions src/raft_server_properties.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ void raft_set_request_timeout(raft_server_t* me_, int millisec)
int raft_get_nodeid(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
if (!me->node)
return -1;
return raft_node_get_id(me->node);
return me->node_id;
}

int raft_get_election_timeout(raft_server_t* me_)
Expand Down Expand Up @@ -140,7 +138,7 @@ void raft_set_state(raft_server_t* me_, int state)
raft_server_private_t* me = (raft_server_private_t*)me_;
/* if became the leader, then update the current leader entry */
if (state == RAFT_STATE_LEADER)
me->current_leader = me->node;
me->leader_id = me->node_id;
me->state = state;
}

Expand All @@ -164,13 +162,7 @@ raft_node_t* raft_get_node(raft_server_t *me_, int nodeid)
raft_node_t* raft_get_my_node(raft_server_t *me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
int i;

for (i = 0; i < me->num_nodes; i++)
if (raft_get_nodeid(me_) == raft_node_get_id(me->nodes[i]))
return me->nodes[i];

return NULL;
return raft_get_node(me_, me->node_id);
}

raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
Expand All @@ -182,15 +174,13 @@ raft_node_t* raft_get_node_from_idx(raft_server_t* me_, const int idx)
int raft_get_current_leader(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
if (me->current_leader)
return raft_node_get_id(me->current_leader);
return -1;
return me->leader_id;
}

raft_node_t* raft_get_current_leader_node(raft_server_t* me_)
{
raft_server_private_t* me = (void*)me_;
return me->current_leader;
return raft_get_node(me_, me->leader_id);
}

void* raft_get_udata(raft_server_t* me_)
Expand All @@ -213,6 +203,12 @@ int raft_is_candidate(raft_server_t* me_)
return raft_get_state(me_) == RAFT_STATE_CANDIDATE;
}

int raft_is_self(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (void*)me_;
return (node && raft_node_get_id(node) == me->node_id);
}

int raft_get_last_log_term(raft_server_t* me_)
{
int current_idx = raft_get_current_idx(me_);
Expand Down