-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug Fix: Avoid Dgraph cluster getting stuck in infinite leader election #3391
Changes from all commits
d86a310
6be5733
e2514e4
cd1f778
fb45daa
f5f170e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,9 @@ type Node struct { | |
// The stages are proposed -> committed (accepted by cluster) -> | ||
// applied (to PL) -> synced (to BadgerDB). | ||
Applied y.WaterMark | ||
|
||
heartbeatsOut int64 | ||
heartbeatsIn int64 | ||
} | ||
|
||
type ToGlog struct { | ||
|
@@ -155,6 +158,20 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node { | |
return n | ||
} | ||
|
||
func (n *Node) ReportRaftComms() { | ||
if !glog.V(3) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this work for level > 3 too? |
||
return | ||
} | ||
ticker := time.NewTicker(time.Second) | ||
defer ticker.Stop() | ||
|
||
for range ticker.C { | ||
out := atomic.SwapInt64(&n.heartbeatsOut, 0) | ||
in := atomic.SwapInt64(&n.heartbeatsIn, 0) | ||
glog.Infof("RaftComm: [%#x] Heartbeats out: %d, in: %d", n.Id, out, in) | ||
} | ||
} | ||
|
||
// SetRaft would set the provided raft.Node to this node. | ||
// It would check fail if the node is already set. | ||
func (n *Node) SetRaft(r raft.Node) { | ||
|
@@ -233,6 +250,7 @@ func (n *Node) Send(msg raftpb.Message) { | |
if glog.V(2) { | ||
switch msg.Type { | ||
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: | ||
atomic.AddInt64(&n.heartbeatsOut, 1) | ||
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp: | ||
case raftpb.MsgApp, raftpb.MsgAppResp: | ||
case raftpb.MsgProp: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -724,10 +724,56 @@ func (n *node) updateRaftProgress() error { | |
if err := txn.CommitAt(1, nil); err != nil { | ||
return err | ||
} | ||
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) | ||
glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) | ||
return nil | ||
} | ||
|
||
func (n *node) checkpointAndClose(done chan struct{}) { | ||
slowTicker := time.NewTicker(time.Minute) | ||
defer slowTicker.Stop() | ||
|
||
for { | ||
select { | ||
case <-slowTicker.C: | ||
// Do these operations asynchronously away from the main Run loop to allow heartbeats to | ||
// be sent on time. Otherwise, followers would just keep running elections. | ||
|
||
n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) | ||
if err := n.updateRaftProgress(); err != nil { | ||
glog.Errorf("While updating Raft progress: %v", err) | ||
} | ||
|
||
if n.AmLeader() { | ||
// We keep track of the applied index in the p directory. Even if we don't take | ||
// snapshot for a while and let the Raft logs grow and restart, we would not have to | ||
// run all the log entries, because we can tell Raft.Config to set Applied to that | ||
// index. | ||
// This applied index tracking also covers the case when we have a big index | ||
// rebuild. The rebuild would be tracked just like others and would not need to be | ||
// replayed after a restart, because the Applied config would let us skip right | ||
// through it. | ||
// We use disk based storage for Raft. So, we're not too concerned about | ||
// snapshotting. We just need to do enough, so that we don't have a huge backlog of | ||
// entries to process on a restart. | ||
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { | ||
x.Errorf("While calculating and proposing snapshot: %v", err) | ||
} | ||
go n.abortOldTransactions() | ||
} | ||
|
||
case <-n.closer.HasBeenClosed(): | ||
glog.Infof("Stopping node.Run") | ||
if peerId, has := groups().MyPeer(); has && n.AmLeader() { | ||
n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) | ||
time.Sleep(time.Second) // Let transfer happen. | ||
} | ||
n.Raft().Stop() | ||
close(done) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (n *node) Run() { | ||
defer n.closer.Done() // CLOSER:1 | ||
|
||
|
@@ -737,20 +783,9 @@ func (n *node) Run() { | |
ticker := time.NewTicker(20 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
slowTicker := time.NewTicker(30 * time.Second) | ||
defer slowTicker.Stop() | ||
|
||
done := make(chan struct{}) | ||
go func() { | ||
<-n.closer.HasBeenClosed() | ||
glog.Infof("Stopping node.Run") | ||
if peerId, has := groups().MyPeer(); has && n.AmLeader() { | ||
n.Raft().TransferLeadership(n.ctx, x.WorkerConfig.RaftId, peerId) | ||
time.Sleep(time.Second) // Let transfer happen. | ||
} | ||
n.Raft().Stop() | ||
close(done) | ||
}() | ||
go n.checkpointAndClose(done) | ||
go n.ReportRaftComms() | ||
|
||
applied, err := n.findRaftProgress() | ||
if err != nil { | ||
|
@@ -759,6 +794,7 @@ func (n *node) Run() { | |
glog.Infof("Found Raft progress in p directory: %d", applied) | ||
} | ||
|
||
var timer x.Timer | ||
for { | ||
select { | ||
case <-done: | ||
|
@@ -769,35 +805,15 @@ func (n *node) Run() { | |
glog.Infoln("Raft node done.") | ||
return | ||
|
||
case <-slowTicker.C: | ||
n.elog.Printf("Size of applyCh: %d", len(n.applyCh)) | ||
if err := n.updateRaftProgress(); err != nil { | ||
glog.Errorf("While updating Raft progress: %v", err) | ||
} | ||
|
||
if leader { | ||
// We keep track of the applied index in the p directory. Even if we don't take | ||
// snapshot for a while and let the Raft logs grow and restart, we would not have to | ||
// run all the log entries, because we can tell Raft.Config to set Applied to that | ||
// index. | ||
// This applied index tracking also covers the case when we have a big index | ||
// rebuild. The rebuild would be tracked just like others and would not need to be | ||
// replayed after a restart, because the Applied config would let us skip right | ||
// through it. | ||
// We use disk based storage for Raft. So, we're not too concerned about | ||
// snapshotting. We just need to do enough, so that we don't have a huge backlog of | ||
// entries to process on a restart. | ||
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { | ||
x.Errorf("While calculating and proposing snapshot: %v", err) | ||
} | ||
go n.abortOldTransactions() | ||
} | ||
|
||
// Slow ticker can't be placed here because figuring out checkpoints and snapshots takes | ||
// time and if the leader does not send heartbeats out during this time, the followers | ||
// start an election process. And that election process would just continue to happen | ||
// indefinitely because checkpoints and snapshots are being calculated indefinitely. | ||
case <-ticker.C: | ||
n.Raft().Tick() | ||
|
||
case rd := <-n.Raft().Ready(): | ||
start := time.Now() | ||
timer.Start() | ||
_, span := otrace.StartSpan(n.ctx, "Alpha.RunLoop", | ||
otrace.WithSampler(otrace.ProbabilitySampler(0.001))) | ||
|
||
|
@@ -875,13 +891,13 @@ func (n *node) Run() { | |
|
||
// Store the hardstate and entries. Note that these are not CommittedEntries. | ||
n.SaveToStorage(rd.HardState, rd.Entries, rd.Snapshot) | ||
diskDur := time.Since(start) | ||
if span != nil { | ||
span.Annotatef(nil, "Saved %d entries. Snapshot, HardState empty? (%v, %v)", | ||
len(rd.Entries), | ||
raft.IsEmptySnap(rd.Snapshot), | ||
raft.IsEmptyHardState(rd.HardState)) | ||
} | ||
timer.Record("disk") | ||
|
||
// Now schedule or apply committed entries. | ||
var proposals []*pb.Proposal | ||
|
@@ -953,8 +969,11 @@ func (n *node) Run() { | |
if span != nil { | ||
span.Annotate(nil, "Followed queued messages.") | ||
} | ||
timer.Record("proposals") | ||
|
||
n.Raft().Advance() | ||
timer.Record("advance") | ||
|
||
if firstRun && n.canCampaign { | ||
go n.Raft().Campaign(n.ctx) | ||
firstRun = false | ||
|
@@ -964,14 +983,13 @@ func (n *node) Run() { | |
span.End() | ||
ostats.RecordWithTags(context.Background(), | ||
[]tag.Mutator{tag.Upsert(x.KeyMethod, "alpha.RunLoop")}, | ||
x.LatencyMs.M(x.SinceMs(start))) | ||
x.LatencyMs.M(float64(timer.Total())/1e6)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could divide by time.MilliSecond instead here |
||
} | ||
if time.Since(start) > 100*time.Millisecond { | ||
if timer.Total() > 100*time.Millisecond { | ||
glog.Warningf( | ||
"Raft.Ready took too long to process: %v. Most likely due to slow disk: %v."+ | ||
"Raft.Ready took too long to process: %s"+ | ||
" Num entries: %d. MustSync: %v", | ||
time.Since(start).Round(time.Millisecond), diskDur.Round(time.Millisecond), | ||
len(rd.Entries), rd.MustSync) | ||
timer.String(), len(rd.Entries), rd.MustSync) | ||
} | ||
} | ||
} | ||
|
@@ -1214,7 +1232,7 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { | |
} | ||
|
||
if num := posting.Oracle().NumPendingTxns(); num > 0 { | ||
glog.Infof("Num pending txns: %d", num) | ||
glog.V(2).Infof("Num pending txns: %d", num) | ||
} | ||
// We can't rely upon the Raft entries to determine the minPendingStart, | ||
// because there are many cases during mutations where we don't commit or | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -379,10 +379,14 @@ func (b *BytesBuffer) TruncateBy(n int) { | |
AssertTrue(b.off >= 0 && b.sz >= 0) | ||
} | ||
|
||
type record struct { | ||
Name string | ||
Dur time.Duration | ||
} | ||
type Timer struct { | ||
start time.Time | ||
last time.Time | ||
records []time.Duration | ||
records []record | ||
} | ||
|
||
func (t *Timer) Start() { | ||
|
@@ -391,18 +395,24 @@ func (t *Timer) Start() { | |
t.records = t.records[:0] | ||
} | ||
|
||
func (t *Timer) Record() { | ||
func (t *Timer) Record(name string) { | ||
now := time.Now() | ||
t.records = append(t.records, now.Sub(t.last)) | ||
t.records = append(t.records, record{ | ||
Name: name, | ||
Dur: now.Sub(t.last).Round(time.Millisecond), | ||
}) | ||
t.last = now | ||
} | ||
|
||
func (t *Timer) Total() time.Duration { | ||
return time.Since(t.start) | ||
return time.Since(t.start).Round(time.Millisecond) | ||
} | ||
|
||
func (t *Timer) All() []time.Duration { | ||
return t.records | ||
func (t *Timer) String() string { | ||
sort.Slice(t.records, func(i, j int) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is sorting helpful in viewing the results? |
||
return t.records[i].Dur > t.records[j].Dur | ||
}) | ||
return fmt.Sprintf("Timer Total: %s. Breakdown: %v", t.Total(), t.records) | ||
} | ||
|
||
// PredicateLang extracts the language from a predicate (or facet) name. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic package issues here? This is 64 bit. https://golang.org/pkg/sync/atomic/#pkg-note-BUG