Skip to content

Commit

Permalink
Handle various edge cases around cluster memberships. (#2791)
Browse files Browse the repository at this point in the history
- We had a bug where we'll infinite remove and connect to an address which is present in both active nodes and removed nodes. This PR checks for active nodes first, before removing a connection.
- Make the errors a lot more descriptive. Use a unique string to determine when a node must crash, due to permanent failures. Refactor those conditions out in `x.ShouldCrash`.
- Removing a live node using `/removeNode` should now automatically crash that node, and cause others to remove the connection to that node, which is convenient.
- Trying to connect using the same ID or the same Address (while prev conn is valid), would also crash the new process (wanted behavior).
- If Zero already has a member set for a given Address (and ID=0), it would return the member info directly, instead of proposing first.

In general, this PR handles a bunch of edge cases and bugs around membership.
  • Loading branch information
manishrjain authored Nov 29, 2018
1 parent 1b301e2 commit 9e1ce3d
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 58 deletions.
3 changes: 1 addition & 2 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
)

var (
ErrDuplicateRaftId = x.Errorf("Node is already part of group")
ErrNoNode = x.Errorf("No node has been set up yet")
ErrNoNode = x.Errorf("No node has been set up yet")
)

type Node struct {
Expand Down
26 changes: 25 additions & 1 deletion conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,34 @@ func (p *Pools) Get(addr string) (*Pool, error) {
return pool, nil
}

func (p *Pools) Remove(addr string) {
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
// Keeps track of valid IP addresses, assigned to active nodes. We do this
// to avoid removing valid IP addresses from the Removed list.
validAddr := make(map[string]struct{})
for _, group := range state.Groups {
for _, member := range group.Members {
validAddr[member.Addr] = struct{}{}
}
}
for _, member := range state.Zeros {
validAddr[member.Addr] = struct{}{}
}
for _, member := range state.Removed {
// Some nodes could have the same IP address. So, check before disconnecting.
if _, valid := validAddr[member.Addr]; !valid {
p.remove(member.Addr)
}
}
}

func (p *Pools) remove(addr string) {
p.Lock()
pool, ok := p.all[addr]
if !ok {
p.Unlock()
return
}
glog.Warningf("DISCONNECTING from %s\n", addr)
delete(p.all, addr)
p.Unlock()
pool.shutdown()
Expand Down Expand Up @@ -200,6 +221,9 @@ func (p *Pool) MonitorHealth() {
}

func (p *Pool) IsHealthy() bool {
if p == nil {
return false
}
p.RLock()
defer p.RUnlock()
return time.Since(p.lastEcho) < 2*echoDuration
Expand Down
5 changes: 3 additions & 2 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
}
// Also check that the new node is not me.
if rc.Id == node.RaftContext.Id {
return nil, ErrDuplicateRaftId
return nil, x.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
}

// Check that the new node is not already part of the group.
if addr, ok := node.Peer(rc.Id); ok && rc.Addr != addr {
// There exists a healthy connection to server with same id.
if _, err := Get().Get(addr); err == nil {
return &api.Payload{}, ErrDuplicateRaftId
return &api.Payload{}, x.Errorf(
"REUSE_ADDR: IP Address same as existing peer: %s", addr)
}
}
node.Connect(rc.Id, rc.Addr)
Expand Down
22 changes: 12 additions & 10 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

otrace "go.opencensus.io/trace"
"google.golang.org/grpc"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
Expand Down Expand Up @@ -188,7 +187,6 @@ func (n *node) handleMemberProposal(member *pb.Member) error {
if has {
delete(group.Members, member.Id)
state.Removed = append(state.Removed, m)
conn.Get().Remove(m.Addr)
}
// else already removed.
return nil
Expand Down Expand Up @@ -348,6 +346,9 @@ func (n *node) applyConfChange(e raftpb.Entry) {
cc.Unmarshal(e.Data)

if cc.Type == raftpb.ConfChangeRemoveNode {
if cc.NodeID == n.Id {
glog.Fatalf("I [id:%d group:0] have been removed. Goodbye!", n.Id)
}
n.DeletePeer(cc.NodeID)
n.server.removeZero(cc.NodeID)

Expand All @@ -360,8 +361,8 @@ func (n *node) applyConfChange(e raftpb.Entry) {
for _, member := range n.server.membershipState().Removed {
// It is not recommended to reuse RAFT ids.
if member.GroupId == 0 && m.Id == member.Id {
x.Errorf("Reusing removed id: %d. Canceling config change.\n", m.Id)
n.DoneConfChange(cc.ID, x.ErrReuseRemovedId)
err := x.Errorf("REUSE_RAFTID: Reusing removed id: %d.\n", m.Id)
n.DoneConfChange(cc.ID, err)
// Cancel configuration change.
cc.NodeID = raft.None
n.Raft().ApplyConfChange(cc)
Expand Down Expand Up @@ -429,10 +430,8 @@ func (n *node) initAndStartNode() error {
if err == nil {
break
}
errorDesc := grpc.ErrorDesc(err)
if errorDesc == conn.ErrDuplicateRaftId.Error() ||
errorDesc == x.ErrReuseRemovedId.Error() {
log.Fatalf("Error while joining cluster: %v", errorDesc)
if x.ShouldCrash(err) {
log.Fatalf("Error while joining cluster: %v", err)
}
glog.Errorf("Error while joining cluster: %v\n", err)
timeout *= 2
Expand Down Expand Up @@ -498,11 +497,14 @@ func (n *node) checkQuorum(closer *y.Closer) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

if err := n.WaitLinearizableRead(ctx); err == nil {
if state, err := n.server.latestMembershipState(ctx); err == nil {
n.mu.Lock()
n.lastQuorum = time.Now()
n.mu.Unlock()
} else if glog.V(2) {
// Also do some connection cleanup.
conn.Get().RemoveInvalid(state)

} else if glog.V(1) {
glog.Warningf("Zero node: %d unable to reach quorum.", n.Id)
}
}
Expand Down
61 changes: 38 additions & 23 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ var (
emptyMembershipState pb.MembershipState
emptyConnectionState pb.ConnectionState
errInternalError = errors.New("Internal server error")
errUnknownMember = errors.New("Unknown cluster member")
errUpdatedMember = errors.New("Cluster member has updated credentials.")
errServerShutDown = errors.New("Server is being shut down.")
)

Expand Down Expand Up @@ -254,7 +252,6 @@ func (s *Server) removeZero(nodeId uint64) {
return
}
delete(s.state.Zeros, nodeId)
go conn.Get().Remove(m.Addr)
s.state.Removed = append(s.state.Removed, m)
}

Expand Down Expand Up @@ -298,11 +295,11 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
for mid, dstMember := range dst.Members {
group, has := s.state.Groups[dstMember.GroupId]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown group for member: %+v", dstMember)
}
srcMember, has := group.Members[mid]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown member: %+v", dstMember)
}
if srcMember.Addr != dstMember.Addr ||
srcMember.Leader != dstMember.Leader {
Expand All @@ -325,7 +322,7 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
for key, dstTablet := range dst.Tablets {
group, has := s.state.Groups[dstTablet.GroupId]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown group for tablet: %+v", dstTablet)
}
srcTablet, has := group.Tablets[key]
if !has {
Expand Down Expand Up @@ -375,37 +372,55 @@ func (s *Server) Connect(ctx context.Context,
x.Errorf("Context has error: %v\n", ctx.Err())
return &emptyConnectionState, ctx.Err()
}
ms, err := s.latestMembershipState(ctx)
if err != nil {
return nil, err
}
if m.ClusterInfoOnly {
// This request only wants to access the membership state, and nothing else. Most likely
// from our clients.
ms, err := s.latestMembershipState(ctx)
cs := &pb.ConnectionState{
State: ms,
MaxPending: s.orc.MaxPending(),
}
return cs, err
}
if len(m.Addr) == 0 {
return &emptyConnectionState, x.Errorf("No address provided: %+v", m)
return &emptyConnectionState, x.Errorf("NO_ADDR: No address provided: %+v", m)
}

for _, member := range s.membershipState().Removed {
for _, member := range ms.Removed {
// It is not recommended to reuse RAFT ids.
if member.GroupId != 0 && m.Id == member.Id {
return &emptyConnectionState, x.ErrReuseRemovedId
}
}

for _, group := range s.state.Groups {
member, has := group.Members[m.Id]
if !has {
break
}
if member.Addr != m.Addr {
// Different address, then check if the last one is healthy or not.
if _, err := conn.Get().Get(member.Addr); err == nil {
// Healthy conn to the existing member with the same id.
return &emptyConnectionState, conn.ErrDuplicateRaftId
return &emptyConnectionState, x.Errorf(
"REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
}
}

for _, group := range ms.Groups {
for _, member := range group.Members {
switch {
case member.Addr == m.Addr && m.Id == 0:
glog.Infof("Found a member with the same address. Returning: %+v", member)
conn.Get().Connect(m.Addr)
return &pb.ConnectionState{
State: ms,
Member: member,
}, nil

case member.Addr == m.Addr && member.Id != m.Id:
// Same address. Different Id. If Id is zero, then it might be trying to connect for
// the first time. We can just directly return the membership information.
return nil, x.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
" Self: +%v", member, m)

case member.Addr != m.Addr && member.Id == m.Id:
// Same Id. Different address.
if pl, err := conn.Get().Get(member.Addr); err == nil && pl.IsHealthy() {
// Found a healthy connection.
return nil, x.Errorf("REUSE_RAFTID: Healthy connection to a member"+
" with same ID: %+v", member)
}
}
}
}
Expand Down
38 changes: 21 additions & 17 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"

"golang.org/x/net/context"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -103,7 +101,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}
zc := pb.NewZeroClient(pl.Get())
connState, err = zc.Connect(gr.ctx, m)
if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() {
if err == nil || x.ShouldCrash(err) {
break
}
}
Expand Down Expand Up @@ -260,27 +258,15 @@ func (g *groupi) applyState(state *pb.MembershipState) {
if g.state != nil && g.state.Counter > state.Counter {
return
}

g.state = state

// While restarting we fill Node information after retrieving initial state.
if g.Node != nil {
// Lets have this block before the one that adds the new members, else we may end up
// removing a freshly added node.
for _, member := range g.state.Removed {
if member.GroupId == g.Node.gid && g.Node.AmLeader() {
go g.Node.ProposePeerRemoval(context.Background(), member.Id)
}
// Each node should have different id and address.
conn.Get().Remove(member.Addr)
}
}

// Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues.
var foundSelf bool
g.tablets = make(map[string]*pb.Tablet)
for gid, group := range g.state.Groups {
for _, member := range group.Members {
if Config.RaftId == member.Id {
foundSelf = true
atomic.StoreUint32(&g.gid, gid)
}
if Config.MyAddr != member.Addr {
Expand All @@ -296,6 +282,23 @@ func (g *groupi) applyState(state *pb.MembershipState) {
conn.Get().Connect(member.Addr)
}
}
if !foundSelf {
// I'm not part of this cluster. I should crash myself.
glog.Fatalf("Unable to find myself [id:%d group:%d] in membership state: %+v. Goodbye!",
g.Node.Id, g.groupId(), state)
}

// While restarting we fill Node information after retrieving initial state.
if g.Node != nil {
// Lets have this block before the one that adds the new members, else we may end up
// removing a freshly added node.
for _, member := range g.state.Removed {
if member.GroupId == g.Node.gid && g.Node.AmLeader() {
go g.Node.ProposePeerRemoval(context.Background(), member.Id)
}
}
conn.Get().RemoveInvalid(g.state)
}
}

func (g *groupi) ServesGroup(gid uint32) bool {
Expand Down Expand Up @@ -521,6 +524,7 @@ func (g *groupi) connToZeroLeader() *conn.Pool {
}
zc := pb.NewZeroClient(pl.Get())
if pl := getLeaderConn(zc); pl != nil {
glog.V(1).Infof("Found connection to leader: %s", pl.Addr)
return pl
}
glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...")
Expand Down
16 changes: 13 additions & 3 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -29,6 +28,8 @@ import (
"strconv"
"strings"
"time"

"google.golang.org/grpc"
)

// Error constants representing different types of errors.
Expand Down Expand Up @@ -68,10 +69,19 @@ const (

var (
// Useful for running multiple servers on the same machine.
regExpHostName = regexp.MustCompile(ValidHostnameRegex)
ErrReuseRemovedId = errors.New("Reusing RAFT index of a removed node.")
regExpHostName = regexp.MustCompile(ValidHostnameRegex)
)

func ShouldCrash(err error) bool {
if err == nil {
return false
}
errStr := grpc.ErrorDesc(err)
return strings.Contains(errStr, "REUSE_RAFTID") ||
strings.Contains(errStr, "REUSE_ADDR") ||
strings.Contains(errStr, "NO_ADDR")
}

// WhiteSpace Replacer removes spaces and tabs from a string.
var WhiteSpace = strings.NewReplacer(" ", "", "\t", "")

Expand Down

0 comments on commit 9e1ce3d

Please sign in to comment.