Skip to content

Commit

Permalink
fix race condition.
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmr committed Jun 4, 2020
1 parent 8a6ebfe commit 572f42e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 20 deletions.
32 changes: 16 additions & 16 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,11 @@ func (n *node) handleMemberProposal(member *pb.Member) error {
if m != nil && (m.Id != member.Id || m.GroupId != member.GroupId) {
return errors.Errorf("Found another member %d with same address: %v", m.Id, m.Addr)
}

// Handle zero membership updates.
if member.GroupId == 0 {
state.Zeros[member.Id] = member
if member.Leader {
// Unset leader flag for other nodes, there can be only one leader at a time.
// Unset leader flag for other nodes, there can be only one
// leader at a time.
for _, m := range state.Zeros {
if m.Id != member.Id {
m.Leader = false
Expand All @@ -191,15 +190,12 @@ func (n *node) handleMemberProposal(member *pb.Member) error {
}
return nil
}

// Handle alpha membership updates.
group := state.Groups[member.GroupId]
if group == nil {
group = newGroup()
state.Groups[member.GroupId] = group
}
m, has := group.Members[member.Id]

if member.AmDead {
if has {
delete(group.Members, member.Id)
Expand All @@ -210,16 +206,7 @@ func (n *node) handleMemberProposal(member *pb.Member) error {

if !has && len(group.Members) >= n.server.NumReplicas {
// We shouldn't allow more members than the number of replicas.
return errors.Errorf("Group reached replication level. Can't add another member: %+v",
member)
}

if has && m.Addr != member.Addr {
// Two different alphas have tried joining the cluster with the same ID.
// Reject this update. The alpha is responsible of retrying until succeeds
// in joining the cluster.
return errors.Errorf("duplicate RAFT ID %d already taken by member at address %s",
m.Id, m.Addr)
return errors.Errorf("Group reached replication level. Can't add another member: %+v", member)
}

// Create a connection to this server.
Expand Down Expand Up @@ -333,12 +320,25 @@ func (n *node) applyProposal(e raftpb.Entry) (string, error) {
}
state.Cid = p.Cid
}

if p.Member != nil && p.Member.Id == 0 {
// A member update with a RAFT ID of zero indicates that this is a new node.
// Allocate a new ID to this member and update the proposal accordingly.
// Before, this was done when creating the proposal. However, this
// lead to a race condition in which multiple new nodes can be assigned
// the same ID because the lock is released in-between creating the proposal
// and staring to apply it.
p.Member.Id = state.MaxRaftId + 1
p.MaxRaftId = p.Member.Id
}

if p.MaxRaftId > 0 {
if p.MaxRaftId <= state.MaxRaftId {
return p.Key, errInvalidProposal
}
state.MaxRaftId = p.MaxRaftId
}

if p.SnapshotTs != nil {
for gid, ts := range p.SnapshotTs {
if group, ok := state.Groups[gid]; ok {
Expand Down
4 changes: 0 additions & 4 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,6 @@ func (s *Server) Connect(ctx context.Context,
return nil
}
}
if m.Id == 0 {
m.Id = s.state.MaxRaftId + 1
proposal.MaxRaftId = m.Id
}

// We don't have this member. So, let's see if it has preference for a group.
if m.GroupId > 0 {
Expand Down

0 comments on commit 572f42e

Please sign in to comment.