Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle various edge cases around cluster memberships. #2791

Merged
merged 2 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
)

var (
ErrDuplicateRaftId = x.Errorf("Node is already part of group")
ErrNoNode = x.Errorf("No node has been set up yet")
ErrNoNode = x.Errorf("No node has been set up yet")
)

type Node struct {
Expand Down
26 changes: 25 additions & 1 deletion conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,34 @@ func (p *Pools) Get(addr string) (*Pool, error) {
return pool, nil
}

func (p *Pools) Remove(addr string) {
func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
// Keeps track of active IP addresses, assigned to valid nodes. We do this to avoid removing
// inactive IP addresses from the Removed list.
validAddr := make(map[string]struct{})
for _, group := range state.Groups {
for _, member := range group.Members {
validAddr[member.Addr] = struct{}{}
}
}
for _, member := range state.Zeros {
validAddr[member.Addr] = struct{}{}
}
for _, member := range state.Removed {
// Some nodes could have the same IP address. So, check before disconnecting.
if _, valid := validAddr[member.Addr]; !valid {
p.remove(member.Addr)
}
}
}

func (p *Pools) remove(addr string) {
p.Lock()
pool, ok := p.all[addr]
if !ok {
p.Unlock()
return
}
glog.Warningf("DISCONNECTING from %s\n", addr)
delete(p.all, addr)
p.Unlock()
pool.shutdown()
Expand Down Expand Up @@ -200,6 +221,9 @@ func (p *Pool) MonitorHealth() {
}

func (p *Pool) IsHealthy() bool {
if p == nil {
return false
}
p.RLock()
defer p.RUnlock()
return time.Since(p.lastEcho) < 2*echoDuration
Expand Down
5 changes: 3 additions & 2 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,15 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
}
// Also check that the new node is not me.
if rc.Id == node.RaftContext.Id {
return nil, ErrDuplicateRaftId
return nil, x.Errorf("REUSE_RAFTID: Raft ID duplicates mine: %+v", rc)
}

// Check that the new node is not already part of the group.
if addr, ok := node.Peer(rc.Id); ok && rc.Addr != addr {
// There exists a healthy connection to server with same id.
if _, err := Get().Get(addr); err == nil {
return &api.Payload{}, ErrDuplicateRaftId
return &api.Payload{}, x.Errorf(
"REUSE_ADDR: IP Address same as existing peer: %s", addr)
}
}
node.Connect(rc.Id, rc.Addr)
Expand Down
23 changes: 13 additions & 10 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

otrace "go.opencensus.io/trace"
"google.golang.org/grpc"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
Expand Down Expand Up @@ -188,7 +187,6 @@ func (n *node) handleMemberProposal(member *pb.Member) error {
if has {
delete(group.Members, member.Id)
state.Removed = append(state.Removed, m)
conn.Get().Remove(m.Addr)
}
// else already removed.
return nil
Expand Down Expand Up @@ -348,6 +346,9 @@ func (n *node) applyConfChange(e raftpb.Entry) {
cc.Unmarshal(e.Data)

if cc.Type == raftpb.ConfChangeRemoveNode {
if cc.NodeID == n.Id {
glog.Fatalf("I [id:%d group:0] have been removed. Goodbye!", n.Id)
}
n.DeletePeer(cc.NodeID)
n.server.removeZero(cc.NodeID)

Expand All @@ -360,8 +361,9 @@ func (n *node) applyConfChange(e raftpb.Entry) {
for _, member := range n.server.membershipState().Removed {
// It is not recommended to reuse RAFT ids.
if member.GroupId == 0 && m.Id == member.Id {
x.Errorf("Reusing removed id: %d. Canceling config change.\n", m.Id)
n.DoneConfChange(cc.ID, x.ErrReuseRemovedId)
// TODO: Is this unused?
err := x.Errorf("REUSE_RAFTID: Reusing removed id: %d.\n", m.Id)
n.DoneConfChange(cc.ID, err)
// Cancel configuration change.
cc.NodeID = raft.None
n.Raft().ApplyConfChange(cc)
Expand Down Expand Up @@ -429,10 +431,8 @@ func (n *node) initAndStartNode() error {
if err == nil {
break
}
errorDesc := grpc.ErrorDesc(err)
if errorDesc == conn.ErrDuplicateRaftId.Error() ||
errorDesc == x.ErrReuseRemovedId.Error() {
log.Fatalf("Error while joining cluster: %v", errorDesc)
if x.ShouldCrash(err) {
log.Fatalf("Error while joining cluster: %v", err)
}
glog.Errorf("Error while joining cluster: %v\n", err)
timeout *= 2
Expand Down Expand Up @@ -498,11 +498,14 @@ func (n *node) checkQuorum(closer *y.Closer) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

if err := n.WaitLinearizableRead(ctx); err == nil {
if state, err := n.server.latestMembershipState(ctx); err == nil {
n.mu.Lock()
n.lastQuorum = time.Now()
n.mu.Unlock()
} else if glog.V(2) {
// Also do some connection cleanup.
conn.Get().RemoveInvalid(state)

} else if glog.V(1) {
glog.Warningf("Zero node: %d unable to reach quorum.", n.Id)
}
}
Expand Down
65 changes: 41 additions & 24 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ var (
emptyMembershipState pb.MembershipState
emptyConnectionState pb.ConnectionState
errInternalError = errors.New("Internal server error")
errUnknownMember = errors.New("Unknown cluster member")
errUpdatedMember = errors.New("Cluster member has updated credentials.")
errServerShutDown = errors.New("Server is being shut down.")
// errUnknownMember = errors.New("Unknown cluster member")
errUpdatedMember = errors.New("Cluster member has updated credentials.")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errUpdatedMember is unused (from varcheck)

errServerShutDown = errors.New("Server is being shut down.")
)

type Server struct {
Expand Down Expand Up @@ -254,7 +254,6 @@ func (s *Server) removeZero(nodeId uint64) {
return
}
delete(s.state.Zeros, nodeId)
go conn.Get().Remove(m.Addr)
s.state.Removed = append(s.state.Removed, m)
}

Expand Down Expand Up @@ -298,11 +297,11 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
for mid, dstMember := range dst.Members {
group, has := s.state.Groups[dstMember.GroupId]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown group for member: %+v", dstMember)
}
srcMember, has := group.Members[mid]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown member: %+v", dstMember)
}
if srcMember.Addr != dstMember.Addr ||
srcMember.Leader != dstMember.Leader {
Expand All @@ -325,7 +324,7 @@ func (s *Server) createProposals(dst *pb.Group) ([]*pb.ZeroProposal, error) {
for key, dstTablet := range dst.Tablets {
group, has := s.state.Groups[dstTablet.GroupId]
if !has {
return res, errUnknownMember
return res, x.Errorf("Unknown group for tablet: %+v", dstTablet)
}
srcTablet, has := group.Tablets[key]
if !has {
Expand Down Expand Up @@ -375,37 +374,55 @@ func (s *Server) Connect(ctx context.Context,
x.Errorf("Context has error: %v\n", ctx.Err())
return &emptyConnectionState, ctx.Err()
}
ms, err := s.latestMembershipState(ctx)
if err != nil {
return nil, err
}
if m.ClusterInfoOnly {
// This request only wants to access the membership state, and nothing else. Most likely
// from our clients.
ms, err := s.latestMembershipState(ctx)
cs := &pb.ConnectionState{
State: ms,
MaxPending: s.orc.MaxPending(),
}
return cs, err
}
if len(m.Addr) == 0 {
return &emptyConnectionState, x.Errorf("No address provided: %+v", m)
return &emptyConnectionState, x.Errorf("NO_ADDR: No address provided: %+v", m)
}

for _, member := range s.membershipState().Removed {
for _, member := range ms.Removed {
// It is not recommended to reuse RAFT ids.
if member.GroupId != 0 && m.Id == member.Id {
return &emptyConnectionState, x.ErrReuseRemovedId
}
}

for _, group := range s.state.Groups {
member, has := group.Members[m.Id]
if !has {
break
}
if member.Addr != m.Addr {
// Different address, then check if the last one is healthy or not.
if _, err := conn.Get().Get(member.Addr); err == nil {
// Healthy conn to the existing member with the same id.
return &emptyConnectionState, conn.ErrDuplicateRaftId
return &emptyConnectionState, x.Errorf(
"REUSE_RAFTID: Duplicate Raft ID %d to removed member: %+v", m.Id, member)
}
}

for _, group := range ms.Groups {
for _, member := range group.Members {
switch {
case member.Addr == m.Addr && m.Id == 0:
glog.Infof("Found a member with the same address. Returning: %+v", member)
conn.Get().Connect(m.Addr)
return &pb.ConnectionState{
State: ms,
Member: member,
}, nil

case member.Addr == m.Addr && member.Id != m.Id:
// Same address. Different Id. If Id is zero, then it might be trying to connect for
// the first time. We can just directly return the membership information.
return nil, x.Errorf("REUSE_ADDR: Duplicate address to existing member: %+v."+
" Self: +%v", member, m)

case member.Addr != m.Addr && member.Id == m.Id:
// Same Id. Different address.
if pl, err := conn.Get().Get(member.Addr); err == nil && pl.IsHealthy() {
// Found a healthy connection.
return nil, x.Errorf("REUSE_RAFTID: Healthy connection to a member"+
" with same ID: %+v", member)
}
}
}
}
Expand Down
38 changes: 21 additions & 17 deletions worker/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"

"golang.org/x/net/context"

"github.com/dgraph-io/badger"
Expand Down Expand Up @@ -103,7 +101,7 @@ func StartRaftNodes(walStore *badger.DB, bindall bool) {
}
zc := pb.NewZeroClient(pl.Get())
connState, err = zc.Connect(gr.ctx, m)
if err == nil || grpc.ErrorDesc(err) == x.ErrReuseRemovedId.Error() {
if err == nil || x.ShouldCrash(err) {
break
}
}
Expand Down Expand Up @@ -260,27 +258,15 @@ func (g *groupi) applyState(state *pb.MembershipState) {
if g.state != nil && g.state.Counter > state.Counter {
return
}

g.state = state

// While restarting we fill Node information after retrieving initial state.
if g.Node != nil {
// Lets have this block before the one that adds the new members, else we may end up
// removing a freshly added node.
for _, member := range g.state.Removed {
if member.GroupId == g.Node.gid && g.Node.AmLeader() {
go g.Node.ProposePeerRemoval(context.Background(), member.Id)
}
// Each node should have different id and address.
conn.Get().Remove(member.Addr)
}
}

// Sometimes this can cause us to lose latest tablet info, but that shouldn't cause any issues.
var foundSelf bool
g.tablets = make(map[string]*pb.Tablet)
for gid, group := range g.state.Groups {
for _, member := range group.Members {
if Config.RaftId == member.Id {
foundSelf = true
atomic.StoreUint32(&g.gid, gid)
}
if Config.MyAddr != member.Addr {
Expand All @@ -296,6 +282,23 @@ func (g *groupi) applyState(state *pb.MembershipState) {
conn.Get().Connect(member.Addr)
}
}
if !foundSelf {
// I'm not part of this cluster. I should crash myself.
glog.Fatalf("Unable to find myself [id:%d group:%d] in membership state: %+v. Goodbye!",
g.Node.Id, g.groupId(), state)
}

// While restarting we fill Node information after retrieving initial state.
if g.Node != nil {
// Lets have this block before the one that adds the new members, else we may end up
// removing a freshly added node.
for _, member := range g.state.Removed {
if member.GroupId == g.Node.gid && g.Node.AmLeader() {
go g.Node.ProposePeerRemoval(context.Background(), member.Id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error return value of g.Node.ProposePeerRemoval is not checked (from errcheck)

}
}
conn.Get().RemoveInvalid(g.state)
}
}

func (g *groupi) ServesGroup(gid uint32) bool {
Expand Down Expand Up @@ -521,6 +524,7 @@ func (g *groupi) connToZeroLeader() *conn.Pool {
}
zc := pb.NewZeroClient(pl.Get())
if pl := getLeaderConn(zc); pl != nil {
glog.V(1).Infof("Found connection to leader: %s", pl.Addr)
return pl
}
glog.V(1).Infof("Unable to connect to a healthy Zero leader. Retrying...")
Expand Down
17 changes: 14 additions & 3 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -29,6 +28,8 @@ import (
"strconv"
"strings"
"time"

"google.golang.org/grpc"
)

// Error constants representing different types of errors.
Expand Down Expand Up @@ -68,10 +69,20 @@ const (

var (
// Useful for running multiple servers on the same machine.
regExpHostName = regexp.MustCompile(ValidHostnameRegex)
ErrReuseRemovedId = errors.New("Reusing RAFT index of a removed node.")
regExpHostName = regexp.MustCompile(ValidHostnameRegex)
// ErrReuseRemovedId = errors.New("REUSE_RAFTID: Reusing RAFT index of a removed node.")
)

func ShouldCrash(err error) bool {
if err == nil {
return false
}
errStr := grpc.ErrorDesc(err)
return strings.Contains(errStr, "REUSE_RAFTID") ||
strings.Contains(errStr, "REUSE_ADDR") ||
strings.Contains(errStr, "NO_ADDR")
}

// WhiteSpace Replacer removes spaces and tabs from a string.
var WhiteSpace = strings.NewReplacer(" ", "", "\t", "")

Expand Down